Implements memory store as a simple alternative to RocksDB

Drops InMemoryGraph in favor of the new memory store
pull/10/head
Tpt 6 years ago
parent c48638d179
commit 38a8f97254
  1. 15
      src/errors.rs
  2. 119
      src/store/isomorphism.rs
  3. 524
      src/store/memory.rs
  4. 41
      src/store/mod.rs
  5. 16
      src/store/numeric_encoder.rs
  6. 6
      src/store/rocksdb.rs
  7. 524
      src/store/rocksdb/storage.rs
  8. 238
      src/store/store.rs
  9. 79
      tests/rdf_test_cases.rs

@ -1,3 +1,5 @@
use std::fmt;
use std::sync::PoisonError;
error_chain! {
foreign_links {
Url(::url::ParseError);
@ -10,3 +12,16 @@ error_chain! {
SparqlParser(::sparql::parser::ParseError);
}
}
impl<T> From<PoisonError<T>> for Error {
fn from(_: PoisonError<T>) -> Self {
//TODO: improve conversion
"Unexpected lock error".into()
}
}
impl From<Error> for fmt::Error {
fn from(_: Error) -> Self {
fmt::Error
}
}

@ -1,3 +1,4 @@
use errors::*;
use model::*;
use std::collections::hash_map::DefaultHasher;
use std::collections::BTreeSet;
@ -5,55 +6,55 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::hash::Hash;
use std::hash::Hasher;
use store::memory::MemoryGraph;
use store::Graph;
#[derive(Eq, PartialEq, Hash, Ord, PartialOrd)]
struct SubjectPredicate<'a> {
subject: &'a NamedOrBlankNode,
predicate: &'a NamedNode,
struct SubjectPredicate {
subject: NamedOrBlankNode,
predicate: NamedNode,
}
impl<'a> SubjectPredicate<'a> {
fn new(subject: &'a NamedOrBlankNode, predicate: &'a NamedNode) -> Self {
impl SubjectPredicate {
fn new(subject: NamedOrBlankNode, predicate: NamedNode) -> Self {
Self { subject, predicate }
}
}
#[derive(Eq, PartialEq, Hash, Ord, PartialOrd)]
struct PredicateObject<'a> {
predicate: &'a NamedNode,
object: &'a Term,
struct PredicateObject {
predicate: NamedNode,
object: Term,
}
impl<'a> PredicateObject<'a> {
fn new(predicate: &'a NamedNode, object: &'a Term) -> Self {
impl PredicateObject {
fn new(predicate: NamedNode, object: Term) -> Self {
Self { predicate, object }
}
}
fn subject_predicates_for_object<'a>(
graph: &'a MemoryGraph,
object: &'a Term,
) -> impl Iterator<Item = SubjectPredicate<'a>> {
graph
.triples_for_object(object)
.map(|t| SubjectPredicate::new(t.subject(), t.predicate()))
fn subject_predicates_for_object(
graph: &impl Graph,
object: &Term,
) -> Result<impl Iterator<Item = Result<SubjectPredicate>>> {
Ok(graph
.triples_for_object(object)?
.map(|t| t.map(|t| SubjectPredicate::new(t.subject().clone(), t.predicate_owned()))))
}
fn predicate_objects_for_subject<'a>(
graph: &'a MemoryGraph,
subject: &'a NamedOrBlankNode,
) -> impl Iterator<Item = PredicateObject<'a>> {
graph
.triples_for_subject(subject)
.map(|t| PredicateObject::new(t.predicate(), t.object()))
fn predicate_objects_for_subject(
graph: &impl Graph,
subject: &NamedOrBlankNode,
) -> Result<impl Iterator<Item = Result<PredicateObject>>> {
Ok(graph
.triples_for_subject(subject)?
.map(|t| t.map(|t| PredicateObject::new(t.predicate().clone(), t.object_owned()))))
}
fn hash_blank_nodes<'a>(
bnodes: HashSet<&'a BlankNode>,
graph: &'a MemoryGraph,
) -> HashMap<u64, Vec<&'a BlankNode>> {
let mut bnodes_by_hash: HashMap<u64, Vec<&BlankNode>> = HashMap::default();
fn hash_blank_nodes(
bnodes: HashSet<BlankNode>,
graph: &impl Graph,
) -> Result<HashMap<u64, Vec<BlankNode>>> {
let mut bnodes_by_hash: HashMap<u64, Vec<BlankNode>> = HashMap::default();
// NB: we need to sort the triples to have the same hash
for bnode in bnodes.into_iter() {
@ -62,7 +63,8 @@ fn hash_blank_nodes<'a>(
{
let subject = NamedOrBlankNode::from(bnode.clone());
let mut po_set: BTreeSet<PredicateObject> = BTreeSet::default();
for po in predicate_objects_for_subject(&graph, &subject) {
for po in predicate_objects_for_subject(graph, &subject)? {
let po = po?;
if !po.object.is_blank_node() {
po_set.insert(po);
}
@ -75,7 +77,8 @@ fn hash_blank_nodes<'a>(
{
let object = Term::from(bnode.clone());
let mut sp_set: BTreeSet<SubjectPredicate> = BTreeSet::default();
for sp in subject_predicates_for_object(&graph, &object) {
for sp in subject_predicates_for_object(graph, &object)? {
let sp = sp?;
if !sp.subject.is_blank_node() {
sp_set.insert(sp);
}
@ -91,64 +94,66 @@ fn hash_blank_nodes<'a>(
.push(bnode);
}
bnodes_by_hash
Ok(bnodes_by_hash)
}
pub trait GraphIsomorphism {
/// Checks if two graphs are [isomorphic](https://www.w3.org/TR/rdf11-concepts/#dfn-graph-isomorphism)
fn is_isomorphic(&self, other: &Self) -> bool;
fn is_isomorphic(&self, other: &Self) -> Result<bool>;
}
impl GraphIsomorphism for MemoryGraph {
impl<G: Graph> GraphIsomorphism for G {
//TODO: proper isomorphism building
fn is_isomorphic(&self, other: &Self) -> bool {
if self.len() != other.len() {
return false;
fn is_isomorphic(&self, other: &Self) -> Result<bool> {
if self.len()? != other.len()? {
return Ok(false);
}
let mut self_bnodes: HashSet<&BlankNode> = HashSet::default();
let mut other_bnodes: HashSet<&BlankNode> = HashSet::default();
let mut self_bnodes: HashSet<BlankNode> = HashSet::default();
let mut other_bnodes: HashSet<BlankNode> = HashSet::default();
for t in self {
for t in self.iter()? {
let t = t?;
if let NamedOrBlankNode::BlankNode(subject) = t.subject() {
self_bnodes.insert(subject);
self_bnodes.insert(subject.clone());
if let Term::BlankNode(object) = t.object() {
self_bnodes.insert(object);
self_bnodes.insert(object.clone());
}
} else if let Term::BlankNode(object) = t.object() {
self_bnodes.insert(object);
} else if !other.contains(t) {
return false;
self_bnodes.insert(object.clone());
} else if !other.contains(&t)? {
return Ok(false);
}
}
for t in other {
for t in other.iter()? {
let t = t?;
if let NamedOrBlankNode::BlankNode(subject) = t.subject() {
other_bnodes.insert(subject);
other_bnodes.insert(subject.clone());
if let Term::BlankNode(object) = t.object() {
other_bnodes.insert(object);
other_bnodes.insert(object.clone());
}
} else if let Term::BlankNode(object) = t.object() {
other_bnodes.insert(object);
} else if !self.contains(t) {
return false;
other_bnodes.insert(object.clone());
} else if !self.contains(&t)? {
return Ok(false);
}
}
let self_bnodes_by_hash = hash_blank_nodes(self_bnodes, &self);
let other_bnodes_by_hash = hash_blank_nodes(other_bnodes, &other);
let self_bnodes_by_hash = hash_blank_nodes(self_bnodes, self)?;
let other_bnodes_by_hash = hash_blank_nodes(other_bnodes, other)?;
if self_bnodes_by_hash.len() != other_bnodes_by_hash.len() {
return false;
return Ok(false);
}
for hash in self_bnodes_by_hash.keys() {
if self_bnodes_by_hash.get(hash).map(|l| l.len())
!= other_bnodes_by_hash.get(hash).map(|l| l.len())
{
return false;
return Ok(false);
}
}
true
Ok(true)
}
}

@ -1,187 +1,457 @@
use model::vocab::rdf;
use model::*;
use std::collections::HashSet;
use std::fmt;
use std::iter::FromIterator;
use errors::*;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::sync::RwLock;
use store::numeric_encoder::*;
use store::store::*;
#[derive(Debug, Clone, Default)]
pub struct MemoryGraph {
triples: HashSet<Triple>,
}
pub type MemoryDataset = StoreDataset<MemoryStore>;
pub type MemoryGraph = StoreDefaultGraph<MemoryStore>;
impl MemoryGraph {
pub fn iter(&self) -> impl Iterator<Item = &Triple> {
self.triples.iter()
#[derive(Default)]
pub struct MemoryStore {
id2str: RwLock<BTreeMap<u64, Vec<u8>>>,
str2id: RwLock<BTreeMap<Vec<u8>, u64>>,
graph_indexes: RwLock<BTreeMap<EncodedTerm, MemoryGraphIndexes>>,
}
pub fn triples_for_subject<'a>(
&'a self,
subject: &'a NamedOrBlankNode,
) -> impl Iterator<Item = &'a Triple> {
self.iter().filter(move |t| t.subject() == subject)
#[derive(Default)]
struct MemoryGraphIndexes {
spo: BTreeMap<EncodedTerm, BTreeMap<EncodedTerm, BTreeSet<EncodedTerm>>>,
pos: BTreeMap<EncodedTerm, BTreeMap<EncodedTerm, BTreeSet<EncodedTerm>>>,
osp: BTreeMap<EncodedTerm, BTreeMap<EncodedTerm, BTreeSet<EncodedTerm>>>,
}
pub fn triples_for_predicate<'a>(
&'a self,
predicate: &'a NamedNode,
) -> impl Iterator<Item = &'a Triple> {
self.iter().filter(move |t| t.predicate() == predicate)
}
impl BytesStore for MemoryStore {
type BytesOutput = Vec<u8>;
pub fn triples_for_object<'a>(&'a self, object: &'a Term) -> impl Iterator<Item = &'a Triple> {
self.iter().filter(move |t| t.object() == object)
fn insert_bytes(&self, value: &[u8]) -> Result<u64> {
let mut id2str = self.id2str.write()?;
let mut str2id = self.str2id.write()?;
let id = str2id.entry(value.to_vec()).or_insert_with(|| {
let id = id2str.len() as u64;
id2str.insert(id, value.to_vec());
id
});
Ok(*id)
}
pub fn triples_for_subject_predicate<'a>(
&'a self,
subject: &'a NamedOrBlankNode,
predicate: &'a NamedNode,
) -> impl Iterator<Item = &'a Triple> {
self.iter()
.filter(move |t| t.subject() == subject && t.predicate() == predicate)
fn get_bytes(&self, id: u64) -> Result<Option<Vec<u8>>> {
Ok(self.id2str.read()?.get(&id).map(|s| s.to_owned()))
}
pub fn objects_for_subject_predicate<'a>(
&'a self,
subject: &'a NamedOrBlankNode,
predicate: &'a NamedNode,
) -> impl Iterator<Item = &'a Term> {
self.triples_for_subject_predicate(subject, predicate)
.map(|t| t.object())
}
pub fn object_for_subject_predicate<'a>(
&'a self,
subject: &'a NamedOrBlankNode,
predicate: &'a NamedNode,
) -> Option<&'a Term> {
self.objects_for_subject_predicate(subject, predicate)
.nth(0)
}
impl EncodedQuadsStore for MemoryStore {
type QuadsIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForSubjectIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForSubjectPredicateIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForSubjectPredicateObjectIterator =
<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForSubjectObjectIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForPredicateIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForPredicateObjectIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForObjectIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForGraphIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForSubjectGraphIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForSubjectPredicateGraphIterator =
<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForSubjectObjectGraphIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForPredicateGraphIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForPredicateObjectGraphIterator =
<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
type QuadsForObjectGraphIterator = <Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter;
pub fn triples_for_predicate_object<'a>(
&'a self,
predicate: &'a NamedNode,
object: &'a Term,
) -> impl Iterator<Item = &'a Triple> {
self.iter()
.filter(move |t| t.predicate() == predicate && t.object() == object)
fn quads(&self) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
for (graph_name, graph) in self.graph_indexes.read()?.iter() {
for (s, pos) in graph.spo.iter() {
for (p, os) in pos.iter() {
for o in os.iter() {
result.push(Ok(encoded_quad(s, p, o, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
pub fn subjects_for_predicate_object<'a>(
&'a self,
predicate: &'a NamedNode,
object: &'a Term,
) -> impl Iterator<Item = &'a NamedOrBlankNode> {
self.triples_for_predicate_object(predicate, object)
.map(|t| t.subject())
fn quads_for_subject(
&self,
subject: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
for (graph_name, graph) in self.graph_indexes.read()?.iter() {
if let Some(pos) = graph.spo.get(subject) {
for (p, os) in pos.iter() {
for o in os.iter() {
result.push(Ok(encoded_quad(subject, p, o, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
pub fn subject_for_predicate_object<'a>(
&'a self,
predicate: &'a NamedNode,
object: &'a Term,
) -> Option<&'a NamedOrBlankNode> {
self.subjects_for_predicate_object(predicate, object).nth(0)
fn quads_for_subject_predicate(
&self,
subject: &EncodedTerm,
predicate: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
for (graph_name, graph) in self.graph_indexes.read()?.iter() {
if let Some(pos) = graph.spo.get(subject) {
if let Some(os) = pos.get(predicate) {
for o in os.iter() {
result.push(Ok(encoded_quad(subject, predicate, o, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
pub fn values_for_list<'a>(&'a self, root: NamedOrBlankNode) -> ListIterator<'a> {
ListIterator {
graph: self,
current_node: Some(root),
fn quads_for_subject_predicate_object(
&self,
subject: &EncodedTerm,
predicate: &EncodedTerm,
object: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
for (graph_name, graph) in self.graph_indexes.read()?.iter() {
if let Some(pos) = graph.spo.get(subject) {
if let Some(os) = pos.get(predicate) {
if os.contains(object) {
result.push(Ok(encoded_quad(subject, predicate, object, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
pub fn len(&self) -> usize {
self.triples.len()
fn quads_for_subject_object(
&self,
subject: &EncodedTerm,
object: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
for (graph_name, graph) in self.graph_indexes.read()?.iter() {
if let Some(sps) = graph.osp.get(object) {
if let Some(ps) = sps.get(subject) {
for p in ps.iter() {
result.push(Ok(encoded_quad(subject, p, object, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
pub fn is_empty(&self) -> bool {
self.triples.is_empty()
fn quads_for_predicate(
&self,
predicate: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
for (graph_name, graph) in self.graph_indexes.read()?.iter() {
if let Some(oss) = graph.pos.get(predicate) {
for (o, ss) in oss.iter() {
for s in ss.iter() {
result.push(Ok(encoded_quad(s, predicate, o, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
pub fn contains(&self, value: &Triple) -> bool {
self.triples.contains(value)
fn quads_for_predicate_object(
&self,
predicate: &EncodedTerm,
object: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
for (graph_name, graph) in self.graph_indexes.read()?.iter() {
if let Some(oss) = graph.pos.get(predicate) {
if let Some(ss) = oss.get(object) {
for s in ss.iter() {
result.push(Ok(encoded_quad(s, predicate, object, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
pub fn insert(&mut self, value: Triple) -> bool {
self.triples.insert(value)
fn quads_for_object(
&self,
object: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
for (graph_name, graph) in self.graph_indexes.read()?.iter() {
if let Some(sps) = graph.osp.get(object) {
for (s, ps) in sps.iter() {
for p in ps.iter() {
result.push(Ok(encoded_quad(s, p, object, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
impl fmt::Display for MemoryGraph {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
for triple in &self.triples {
write!(fmt, "{}\n", triple)?;
fn quads_for_graph(
&self,
graph_name: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
if let Some(graph) = self.graph_indexes.read()?.get(graph_name) {
for (s, pos) in graph.spo.iter() {
for (p, os) in pos.iter() {
for o in os.iter() {
result.push(Ok(encoded_quad(s, p, o, graph_name)))
}
}
Ok(())
}
}
Ok(result.into_iter())
}
impl IntoIterator for MemoryGraph {
type Item = Triple;
type IntoIter = <HashSet<Triple> as IntoIterator>::IntoIter;
fn quads_for_subject_graph(
&self,
subject: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
if let Some(graph) = self.graph_indexes.read()?.get(graph_name) {
if let Some(pos) = graph.spo.get(subject) {
for (p, os) in pos.iter() {
for o in os.iter() {
result.push(Ok(encoded_quad(subject, p, o, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
fn into_iter(self) -> <Self as IntoIterator>::IntoIter {
self.triples.into_iter()
fn quads_for_subject_predicate_graph(
&self,
subject: &EncodedTerm,
predicate: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
if let Some(graph) = self.graph_indexes.read()?.get(graph_name) {
if let Some(pos) = graph.spo.get(subject) {
if let Some(os) = pos.get(predicate) {
for o in os.iter() {
result.push(Ok(encoded_quad(subject, predicate, o, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
impl<'a> IntoIterator for &'a MemoryGraph {
type Item = &'a Triple;
type IntoIter = <&'a HashSet<Triple> as IntoIterator>::IntoIter;
fn quads_for_subject_object_graph(
&self,
subject: &EncodedTerm,
object: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
if let Some(graph) = self.graph_indexes.read()?.get(graph_name) {
if let Some(sps) = graph.osp.get(object) {
if let Some(ps) = sps.get(subject) {
for p in ps.iter() {
result.push(Ok(encoded_quad(subject, p, object, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
fn into_iter(self) -> <Self as IntoIterator>::IntoIter {
self.triples.iter()
fn quads_for_predicate_graph(
&self,
predicate: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
if let Some(graph) = self.graph_indexes.read()?.get(graph_name) {
if let Some(oss) = graph.pos.get(predicate) {
for (o, ss) in oss.iter() {
for s in ss.iter() {
result.push(Ok(encoded_quad(s, predicate, o, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
impl FromIterator<Triple> for MemoryGraph {
fn from_iter<I: IntoIterator<Item = Triple>>(iter: I) -> Self {
let triples = HashSet::from_iter(iter);
Self { triples }
fn quads_for_predicate_object_graph(
&self,
predicate: &EncodedTerm,
object: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
if let Some(graph) = self.graph_indexes.read()?.get(graph_name) {
if let Some(oss) = graph.pos.get(predicate) {
if let Some(ss) = oss.get(object) {
for s in ss.iter() {
result.push(Ok(encoded_quad(s, predicate, object, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
impl Extend<Triple> for MemoryGraph {
fn extend<I: IntoIterator<Item = Triple>>(&mut self, iter: I) {
self.triples.extend(iter)
fn quads_for_object_graph(
&self,
object: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<<Vec<Result<EncodedQuad>> as IntoIterator>::IntoIter> {
let mut result = Vec::default();
if let Some(graph) = self.graph_indexes.read()?.get(graph_name) {
if let Some(sps) = graph.osp.get(object) {
for (s, ps) in sps.iter() {
for p in ps.iter() {
result.push(Ok(encoded_quad(s, p, object, graph_name)))
}
}
}
}
Ok(result.into_iter())
}
impl<'a> Extend<&'a Triple> for MemoryGraph {
fn extend<I: IntoIterator<Item = &'a Triple>>(&mut self, iter: I) {
self.triples.extend(iter.into_iter().cloned())
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self
.graph_indexes
.read()?
.get(&quad.graph_name)
.map(|graph| {
graph
.spo
.get(&quad.subject)
.map(|po| {
po.get(&quad.predicate)
.map(|o| o.contains(&quad.object))
.unwrap_or(false)
})
.unwrap_or(false)
})
.unwrap_or(false))
}
fn insert(&self, quad: &EncodedQuad) -> Result<()> {
let mut graph_indexes = self.graph_indexes.write()?;
let graph = graph_indexes
.entry(quad.graph_name.clone())
.or_insert_with(MemoryGraphIndexes::default);
graph
.spo
.entry(quad.subject.clone())
.or_default()
.entry(quad.predicate.clone())
.or_default()
.insert(quad.object.clone());
graph
.pos
.entry(quad.predicate.clone())
.or_default()
.entry(quad.object.clone())
.or_default()
.insert(quad.subject.clone());
graph
.osp
.entry(quad.object.clone())
.or_default()
.entry(quad.subject.clone())
.or_default()
.insert(quad.predicate.clone());
Ok(())
}
pub struct ListIterator<'a> {
graph: &'a MemoryGraph,
current_node: Option<NamedOrBlankNode>,
fn remove(&self, quad: &EncodedQuad) -> Result<()> {
let mut graph_indexes = self.graph_indexes.write()?;
let mut empty_graph = false;
if let Some(graph) = graph_indexes.get_mut(&quad.graph_name) {
{
let mut empty_pos = false;
if let Some(mut pos) = graph.spo.get_mut(&quad.subject) {
let mut empty_os = false;
if let Some(mut os) = pos.get_mut(&quad.predicate) {
os.remove(&quad.object);
empty_os = os.is_empty();
}
if empty_os {
pos.remove(&quad.predicate);
}
empty_pos = pos.is_empty();
}
if empty_pos {
graph.spo.remove(&quad.subject);
}
}
impl<'a> Iterator for ListIterator<'a> {
type Item = Term;
{
let mut empty_oss = false;
if let Some(mut oss) = graph.pos.get_mut(&quad.predicate) {
let mut empty_ss = false;
if let Some(mut ss) = oss.get_mut(&quad.object) {
ss.remove(&quad.subject);
empty_ss = ss.is_empty();
}
if empty_ss {
oss.remove(&quad.object);
}
empty_oss = oss.is_empty();
}
if empty_oss {
graph.pos.remove(&quad.predicate);
}
}
fn next(&mut self) -> Option<Term> {
match self.current_node.clone() {
Some(current) => {
let result = self
.graph
.object_for_subject_predicate(&current, &rdf::FIRST)?
.clone();
self.current_node = match self
.graph
.object_for_subject_predicate(&current, &rdf::REST)
{
Some(Term::NamedNode(n)) if *n == *rdf::NIL => None,
Some(Term::NamedNode(n)) => Some(n.clone().into()),
Some(Term::BlankNode(n)) => Some(n.clone().into()),
_ => None,
};
Some(result)
let mut empty_sps = false;
if let Some(mut sps) = graph.osp.get_mut(&quad.object) {
let mut empty_ps = false;
if let Some(mut ps) = sps.get_mut(&quad.subject) {
ps.remove(&quad.predicate);
empty_ps = ps.is_empty();
}
if empty_ps {
sps.remove(&quad.subject);
}
empty_sps = sps.is_empty();
}
if empty_sps {
graph.osp.remove(&quad.object);
}
}
None => None,
empty_graph = graph.spo.is_empty();
}
if empty_graph {
graph_indexes.remove(&quad.graph_name);
}
Ok(())
}
}
fn encoded_quad(
subject: &EncodedTerm,
predicate: &EncodedTerm,
object: &EncodedTerm,
graph_name: &EncodedTerm,
) -> EncodedQuad {
EncodedQuad::new(
subject.clone(),
predicate.clone(),
object.clone(),
graph_name.clone(),
)
}

@ -1,19 +1,23 @@
pub mod isomorphism;
pub mod memory;
mod memory;
mod numeric_encoder;
pub mod rocksdb;
mod rocksdb;
mod store;
use errors::*;
use model::*;
pub use store::memory::MemoryDataset;
pub use store::memory::MemoryGraph;
pub use store::rocksdb::RocksDbDataset;
pub trait Graph {
type TriplesIterator: Iterator<Item = Result<Triple>>;
type TriplesForSubjectIterator: Iterator<Item = Result<Triple>>;
type TriplesForSubjectPredicateIterator: Iterator<Item = Result<Triple>>;
type TriplesForSubjectObjectIterator: Iterator<Item = Result<Triple>>;
type ObjectsForSubjectPredicateIterator: Iterator<Item = Result<Term>>;
type PredicatesForSubjectObjectIterator: Iterator<Item = Result<NamedNode>>;
type TriplesForPredicateIterator: Iterator<Item = Result<Triple>>;
type TriplesForPredicateObjectIterator: Iterator<Item = Result<Triple>>;
type SubjectsForPredicateObjectIterator: Iterator<Item = Result<NamedOrBlankNode>>;
type TriplesForObjectIterator: Iterator<Item = Result<Triple>>;
fn iter(&self) -> Result<Self::TriplesIterator> {
@ -27,28 +31,43 @@ pub trait Graph {
subject: &NamedOrBlankNode,
) -> Result<Self::TriplesForSubjectIterator>;
fn triples_for_subject_predicate(
fn objects_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<Self::TriplesForSubjectPredicateIterator>;
) -> Result<Self::ObjectsForSubjectPredicateIterator>;
fn object_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<Option<Term>> {
//TODO use transpose when stable
match self
.objects_for_subject_predicate(subject, predicate)?
.nth(0)
{
Some(object) => Ok(Some(object?)),
None => Ok(None),
}
}
fn triples_for_subject_object(
fn predicates_for_subject_object(
&self,
subject: &NamedOrBlankNode,
object: &Term,
) -> Result<Self::TriplesForSubjectObjectIterator>;
) -> Result<Self::PredicatesForSubjectObjectIterator>;
fn triples_for_predicate(
&self,
predicate: &NamedNode,
) -> Result<Self::TriplesForPredicateIterator>;
fn triples_for_predicate_object(
fn subjects_for_predicate_object(
&self,
predicate: &NamedNode,
object: &Term,
) -> Result<Self::TriplesForPredicateObjectIterator>;
) -> Result<Self::SubjectsForPredicateObjectIterator>;
fn triples_for_object(&self, object: &Term) -> Result<Self::TriplesForObjectIterator>;

@ -53,6 +53,22 @@ pub struct EncodedQuad {
pub graph_name: EncodedTerm,
}
impl EncodedQuad {
pub fn new(
subject: EncodedTerm,
predicate: EncodedTerm,
object: EncodedTerm,
graph_name: EncodedTerm,
) -> Self {
Self {
subject,
predicate,
object,
graph_name,
}
}
}
pub trait TermReader {
fn read_term(&mut self) -> Result<EncodedTerm>;
fn read_spog_quad(&mut self) -> Result<EncodedQuad>;

@ -79,11 +79,7 @@ impl BytesStore for RocksDbStore {
Ok(match self.db.get_cf(self.str2id_cf, value)? {
Some(id) => from_bytes_slice(&id),
None => {
let id = self
.str_id_counter
.lock()
.unwrap()
.get_and_increment(&self.db)? as u64;
let id = self.str_id_counter.lock()?.get_and_increment(&self.db)? as u64;
let id_bytes = to_bytes(id);
let mut batch = WriteBatch::default();
batch.put_cf(self.id2str_cf, &id_bytes, value)?;

@ -1,524 +0,0 @@
use errors::*;
use rocksdb::ColumnFamily;
use rocksdb::DBRawIterator;
use rocksdb::DBVector;
use rocksdb::Options;
use rocksdb::WriteBatch;
use rocksdb::DB;
use std::io::Cursor;
use std::mem::size_of;
use std::path::Path;
use std::str;
use std::sync::Mutex;
use store::numeric_encoder::*;
use store::store::EncodedQuadsStore;
use utils::from_bytes;
use utils::from_bytes_slice;
use utils::to_bytes;
const ID2STR_CF: &'static str = "id2str";
const STR2ID_CF: &'static str = "id2str";
const SPOG_CF: &'static str = "spog";
const POSG_CF: &'static str = "posg";
const OSPG_CF: &'static str = "ospg";
const EMPTY_BUF: [u8; 0] = [0 as u8; 0];
//TODO: indexes for the default graph and indexes for the named graphs (no more Optional and space saving)
const COLUMN_FAMILIES: [&'static str; 5] = [ID2STR_CF, STR2ID_CF, SPOG_CF, POSG_CF, OSPG_CF];
pub struct RocksDbStore {
db: DB,
str_id_counter: Mutex<RocksDBCounter>,
id2str_cf: ColumnFamily,
str2id_cf: ColumnFamily,
spog_cf: ColumnFamily,
posg_cf: ColumnFamily,
ospg_cf: ColumnFamily,
}
impl RocksDbStore {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
let db = DB::open_cf(&options, path, &COLUMN_FAMILIES)?;
let id2str_cf = get_cf(&db, STR2ID_CF)?;
let str2id_cf = get_cf(&db, ID2STR_CF)?;
let spog_cf = get_cf(&db, SPOG_CF)?;
let posg_cf = get_cf(&db, POSG_CF)?;
let ospg_cf = get_cf(&db, OSPG_CF)?;
Ok(Self {
db,
str_id_counter: Mutex::new(RocksDBCounter::new("bsc")),
id2str_cf,
str2id_cf,
spog_cf,
posg_cf,
ospg_cf,
})
}
}
impl BytesStore for RocksDbStore {
type BytesOutput = DBVector;
fn insert_bytes(&self, value: &[u8]) -> Result<u64> {
Ok(match self.db.get_cf(self.str2id_cf, value)? {
Some(id) => from_bytes_slice(&id),
None => {
let id = self
.str_id_counter
.lock()
.unwrap()
.get_and_increment(&self.db)? as u64;
let id_bytes = to_bytes(id);
let mut batch = WriteBatch::default();
batch.put_cf(self.id2str_cf, &id_bytes, value)?;
batch.put_cf(self.str2id_cf, value, &id_bytes)?;
self.db.write(batch)?;
id
}
})
}
fn get_bytes(&self, id: u64) -> Result<Option<DBVector>> {
Ok(self.db.get_cf(self.id2str_cf, &to_bytes(id))?)
}
}
impl EncodedQuadsStore for RocksDbStore {
type QuadsIterator = SPOGIndexIterator;
type QuadsForSubjectIterator = FilteringEncodedQuadsIterator<SPOGIndexIterator>;
type QuadsForSubjectPredicateIterator = FilteringEncodedQuadsIterator<SPOGIndexIterator>;
type QuadsForSubjectPredicateObjectIterator = FilteringEncodedQuadsIterator<SPOGIndexIterator>;
type QuadsForSubjectObjectIterator = FilteringEncodedQuadsIterator<OSPGIndexIterator>;
type QuadsForPredicateIterator = FilteringEncodedQuadsIterator<POSGIndexIterator>;
type QuadsForPredicateObjectIterator = FilteringEncodedQuadsIterator<POSGIndexIterator>;
type QuadsForObjectIterator = FilteringEncodedQuadsIterator<OSPGIndexIterator>;
type QuadsForGraphIterator = InGraphQuadsIterator<SPOGIndexIterator>;
type QuadsForSubjectGraphIterator =
InGraphQuadsIterator<FilteringEncodedQuadsIterator<SPOGIndexIterator>>;
type QuadsForSubjectPredicateGraphIterator =
InGraphQuadsIterator<FilteringEncodedQuadsIterator<SPOGIndexIterator>>;
type QuadsForSubjectObjectGraphIterator =
InGraphQuadsIterator<FilteringEncodedQuadsIterator<OSPGIndexIterator>>;
type QuadsForPredicateGraphIterator =
InGraphQuadsIterator<FilteringEncodedQuadsIterator<POSGIndexIterator>>;
type QuadsForPredicateObjectGraphIterator =
InGraphQuadsIterator<FilteringEncodedQuadsIterator<POSGIndexIterator>>;
type QuadsForObjectGraphIterator =
InGraphQuadsIterator<FilteringEncodedQuadsIterator<OSPGIndexIterator>>;
fn quads(&self) -> Result<SPOGIndexIterator> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek_to_first();
Ok(SPOGIndexIterator { iter })
}
fn quads_for_subject(
&self,
subject: &EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term(subject)?);
Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter },
filter: EncodedQuadPattern::new(Some(subject.clone()), None, None, None),
})
}
fn quads_for_subject_predicate(
&self,
subject: &EncodedTerm,
predicate: &EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_pair(subject, predicate)?);
Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter },
filter: EncodedQuadPattern::new(
Some(subject.clone()),
Some(predicate.clone()),
None,
None,
),
})
}
fn quads_for_subject_predicate_object(
&self,
subject: &EncodedTerm,
predicate: &EncodedTerm,
object: &EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_triple(subject, predicate, object)?);
Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter },
filter: EncodedQuadPattern::new(
Some(subject.clone()),
Some(predicate.clone()),
Some(object.clone()),
None,
),
})
}
fn quads_for_subject_object(
&self,
subject: &EncodedTerm,
object: &EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_pair(object, subject)?);
Ok(FilteringEncodedQuadsIterator {
iter: OSPGIndexIterator { iter },
filter: EncodedQuadPattern::new(
Some(subject.clone()),
None,
Some(object.clone()),
None,
),
})
}
fn quads_for_predicate(
&self,
predicate: &EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.posg_cf)?;
iter.seek(&encode_term(predicate)?);
Ok(FilteringEncodedQuadsIterator {
iter: POSGIndexIterator { iter },
filter: EncodedQuadPattern::new(None, Some(predicate.clone()), None, None),
})
}
fn quads_for_predicate_object(
&self,
predicate: &EncodedTerm,
object: &EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_pair(predicate, object)?);
Ok(FilteringEncodedQuadsIterator {
iter: POSGIndexIterator { iter },
filter: EncodedQuadPattern::new(
None,
Some(predicate.clone()),
Some(object.clone()),
None,
),
})
}
fn quads_for_object(
&self,
object: &EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.ospg_cf)?;
iter.seek(&encode_term(&object)?);
Ok(FilteringEncodedQuadsIterator {
iter: OSPGIndexIterator { iter },
filter: EncodedQuadPattern::new(None, None, Some(object.clone()), None),
})
}
fn quads_for_graph(
&self,
graph_name: &EncodedTerm,
) -> Result<InGraphQuadsIterator<SPOGIndexIterator>> {
Ok(InGraphQuadsIterator {
iter: self.quads()?,
graph_name: graph_name.clone(),
})
}
fn quads_for_subject_graph(
&self,
subject: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<SPOGIndexIterator>>> {
Ok(InGraphQuadsIterator {
iter: self.quads_for_subject(subject)?,
graph_name: graph_name.clone(),
})
}
fn quads_for_subject_predicate_graph(
&self,
subject: &EncodedTerm,
predicate: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<SPOGIndexIterator>>> {
Ok(InGraphQuadsIterator {
iter: self.quads_for_subject_predicate(subject, predicate)?,
graph_name: graph_name.clone(),
})
}
fn quads_for_subject_object_graph(
&self,
subject: &EncodedTerm,
object: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<OSPGIndexIterator>>> {
Ok(InGraphQuadsIterator {
iter: self.quads_for_subject_object(subject, object)?,
graph_name: graph_name.clone(),
})
}
fn quads_for_predicate_graph(
&self,
predicate: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<POSGIndexIterator>>> {
Ok(InGraphQuadsIterator {
iter: self.quads_for_predicate(predicate)?,
graph_name: graph_name.clone(),
})
}
fn quads_for_predicate_object_graph(
&self,
predicate: &EncodedTerm,
object: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<POSGIndexIterator>>> {
Ok(InGraphQuadsIterator {
iter: self.quads_for_predicate_object(predicate, object)?,
graph_name: graph_name.clone(),
})
}
fn quads_for_object_graph(
&self,
object: &EncodedTerm,
graph_name: &EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<OSPGIndexIterator>>> {
Ok(InGraphQuadsIterator {
iter: self.quads_for_object(object)?,
graph_name: graph_name.clone(),
})
}
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self
.db
.get_cf(self.spog_cf, &encode_spog_quad(quad)?)?
.is_some())
}
fn insert(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
batch.put_cf(self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?;
batch.put_cf(self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?;
batch.put_cf(self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?;
Ok(self.db.write(batch)?) //TODO: check what's going on if the key already exists
}
fn remove(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
batch.delete_cf(self.spog_cf, &encode_spog_quad(quad)?)?;
batch.delete_cf(self.posg_cf, &encode_posg_quad(quad)?)?;
batch.delete_cf(self.ospg_cf, &encode_ospg_quad(quad)?)?;
Ok(self.db.write(batch)?)
}
}
pub fn get_cf(db: &DB, name: &str) -> Result<ColumnFamily> {
db.cf_handle(name)
.ok_or_else(|| "column family not found".into())
}
struct RocksDBCounter {
name: &'static str,
}
impl RocksDBCounter {
fn new(name: &'static str) -> Self {
Self { name }
}
fn get_and_increment(&self, db: &DB) -> Result<u64> {
let value = db
.get(self.name.as_bytes())?
.map(|b| {
let mut buf = [0 as u8; size_of::<usize>()];
buf.copy_from_slice(&b);
from_bytes(buf)
})
.unwrap_or(0);
db.put(self.name.as_bytes(), &to_bytes(value + 1))?;
Ok(value)
}
}
struct EncodedQuadPattern {
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
}
impl EncodedQuadPattern {
fn new(
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Self {
Self {
subject,
predicate,
object,
graph_name,
}
}
fn filter(&self, quad: &EncodedQuad) -> bool {
if let Some(ref subject) = self.subject {
if &quad.subject != subject {
return false;
}
}
if let Some(ref predicate) = self.predicate {
if &quad.predicate != predicate {
return false;
}
}
if let Some(ref object) = self.object {
if &quad.object != object {
return false;
}
}
if let Some(ref graph_name) = self.graph_name {
if &quad.graph_name != graph_name {
return false;
}
}
true
}
}
fn encode_term(t: &EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::default();
vec.write_term(&t)?;
Ok(vec)
}
fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::default();
vec.write_term(&t1)?;
vec.write_term(&t2)?;
Ok(vec)
}
fn encode_term_triple(t1: &EncodedTerm, t2: &EncodedTerm, t3: &EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::default();
vec.write_term(&t1)?;
vec.write_term(&t2)?;
vec.write_term(&t3)?;
Ok(vec)
}
fn encode_spog_quad(quad: &EncodedQuad) -> Result<Vec<u8>> {
let mut vec = Vec::default();
vec.write_spog_quad(quad)?;
Ok(vec)
}
fn encode_posg_quad(quad: &EncodedQuad) -> Result<Vec<u8>> {
let mut vec = Vec::default();
vec.write_posg_quad(quad)?;
Ok(vec)
}
fn encode_ospg_quad(quad: &EncodedQuad) -> Result<Vec<u8>> {
let mut vec = Vec::default();
vec.write_ospg_quad(quad)?;
Ok(vec)
}
pub struct SPOGIndexIterator {
iter: DBRawIterator,
}
impl Iterator for SPOGIndexIterator {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
self.iter
.key()
.map(|buffer| Cursor::new(buffer).read_spog_quad())
}
}
pub struct POSGIndexIterator {
iter: DBRawIterator,
}
impl Iterator for POSGIndexIterator {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
self.iter
.key()
.map(|buffer| Cursor::new(buffer).read_posg_quad())
}
}
pub struct OSPGIndexIterator {
iter: DBRawIterator,
}
impl Iterator for OSPGIndexIterator {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
self.iter
.key()
.map(|buffer| Cursor::new(buffer).read_ospg_quad())
}
}
pub struct FilteringEncodedQuadsIterator<I: Iterator<Item = Result<EncodedQuad>>> {
iter: I,
filter: EncodedQuadPattern,
}
impl<I: Iterator<Item = Result<EncodedQuad>>> Iterator for FilteringEncodedQuadsIterator<I> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next().filter(|quad| match quad {
Ok(quad) => self.filter.filter(quad),
Err(_) => true,
})
}
}
pub struct InGraphQuadsIterator<I: Iterator<Item = Result<EncodedQuad>>> {
iter: I,
graph_name: EncodedTerm,
}
impl<I: Iterator<Item = Result<EncodedQuad>>> Iterator for InGraphQuadsIterator<I> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
let graph_name = &self.graph_name;
self.iter.find(|quad| match quad {
Ok(quad) => graph_name == &quad.graph_name,
Err(_) => true,
})
}
}

@ -1,5 +1,8 @@
use errors::*;
use model::*;
use std::fmt;
use std::iter::FromIterator;
use std::iter::Iterator;
use std::sync::Arc;
use store::numeric_encoder::*;
use store::Dataset;
@ -270,6 +273,41 @@ impl<S: EncodedQuadsStore> Dataset for StoreDataset<S> {
}
}
impl<S: EncodedQuadsStore> fmt::Display for StoreDataset<S> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
for quad in self.iter()? {
write!(fmt, "{}\n", quad?)?;
}
Ok(())
}
}
impl<S: EncodedQuadsStore + Default> Default for StoreDataset<S> {
fn default() -> Self {
Self::new_from_store(S::default())
}
}
impl<S: EncodedQuadsStore + Default> FromIterator<Quad> for StoreDataset<S> {
fn from_iter<I: IntoIterator<Item = Quad>>(iter: I) -> Self {
let dataset = StoreDataset::default();
for quad in iter {
dataset.insert(&quad).unwrap();
}
dataset
}
}
impl<'a, S: EncodedQuadsStore + Default> FromIterator<&'a Quad> for StoreDataset<S> {
fn from_iter<I: IntoIterator<Item = &'a Quad>>(iter: I) -> Self {
let dataset = StoreDataset::default();
for quad in iter {
dataset.insert(quad).unwrap();
}
dataset
}
}
pub struct StoreNamedGraph<S: EncodedQuadsStore> {
store: Arc<S>,
name: NamedOrBlankNode,
@ -279,13 +317,13 @@ pub struct StoreNamedGraph<S: EncodedQuadsStore> {
impl<S: EncodedQuadsStore> Graph for StoreNamedGraph<S> {
type TriplesIterator = TriplesIterator<S::QuadsForGraphIterator, S>;
type TriplesForSubjectIterator = TriplesIterator<S::QuadsForSubjectGraphIterator, S>;
type TriplesForSubjectPredicateIterator =
TriplesIterator<S::QuadsForSubjectPredicateGraphIterator, S>;
type TriplesForSubjectObjectIterator =
TriplesIterator<S::QuadsForSubjectObjectGraphIterator, S>;
type ObjectsForSubjectPredicateIterator =
ObjectsIterator<S::QuadsForSubjectPredicateGraphIterator, S>;
type PredicatesForSubjectObjectIterator =
PredicatesIterator<S::QuadsForSubjectObjectGraphIterator, S>;
type TriplesForPredicateIterator = TriplesIterator<S::QuadsForPredicateGraphIterator, S>;
type TriplesForPredicateObjectIterator =
TriplesIterator<S::QuadsForPredicateObjectGraphIterator, S>;
type SubjectsForPredicateObjectIterator =
SubjectsIterator<S::QuadsForPredicateObjectGraphIterator, S>;
type TriplesForObjectIterator = TriplesIterator<S::QuadsForObjectGraphIterator, S>;
fn triples(&self) -> Result<TriplesIterator<S::QuadsForGraphIterator, S>> {
@ -308,13 +346,13 @@ impl<S: EncodedQuadsStore> Graph for StoreNamedGraph<S> {
store: self.store.clone(),
})
}
fn triples_for_subject_predicate(
fn objects_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<TriplesIterator<S::QuadsForSubjectPredicateGraphIterator, S>> {
) -> Result<ObjectsIterator<S::QuadsForSubjectPredicateGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
Ok(ObjectsIterator {
iter: self.store.quads_for_subject_predicate_graph(
&encoder.encode_named_or_blank_node(subject)?,
&encoder.encode_named_node(predicate)?,
@ -323,13 +361,13 @@ impl<S: EncodedQuadsStore> Graph for StoreNamedGraph<S> {
store: self.store.clone(),
})
}
fn triples_for_subject_object(
fn predicates_for_subject_object(
&self,
subject: &NamedOrBlankNode,
object: &Term,
) -> Result<TriplesIterator<S::QuadsForSubjectObjectGraphIterator, S>> {
) -> Result<PredicatesIterator<S::QuadsForSubjectObjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
Ok(PredicatesIterator {
iter: self.store.quads_for_subject_object_graph(
&encoder.encode_named_or_blank_node(subject)?,
&encoder.encode_term(object)?,
@ -351,13 +389,13 @@ impl<S: EncodedQuadsStore> Graph for StoreNamedGraph<S> {
store: self.store.clone(),
})
}
fn triples_for_predicate_object(
fn subjects_for_predicate_object(
&self,
predicate: &NamedNode,
object: &Term,
) -> Result<TriplesIterator<S::QuadsForPredicateObjectGraphIterator, S>> {
) -> Result<SubjectsIterator<S::QuadsForPredicateObjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
Ok(SubjectsIterator {
iter: self.store.quads_for_predicate_object_graph(
&encoder.encode_named_node(predicate)?,
&encoder.encode_term(object)?,
@ -424,6 +462,15 @@ impl<S: EncodedQuadsStore> NamedGraph for StoreNamedGraph<S> {
}
}
impl<S: EncodedQuadsStore> fmt::Display for StoreNamedGraph<S> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
for triple in self.iter()? {
write!(fmt, "{}\n", triple?)?;
}
Ok(())
}
}
pub struct StoreDefaultGraph<S: EncodedQuadsStore> {
store: Arc<S>,
}
@ -431,13 +478,13 @@ pub struct StoreDefaultGraph<S: EncodedQuadsStore> {
impl<S: EncodedQuadsStore> Graph for StoreDefaultGraph<S> {
type TriplesIterator = TriplesIterator<S::QuadsForGraphIterator, S>;
type TriplesForSubjectIterator = TriplesIterator<S::QuadsForSubjectGraphIterator, S>;
type TriplesForSubjectPredicateIterator =
TriplesIterator<S::QuadsForSubjectPredicateGraphIterator, S>;
type TriplesForSubjectObjectIterator =
TriplesIterator<S::QuadsForSubjectObjectGraphIterator, S>;
type ObjectsForSubjectPredicateIterator =
ObjectsIterator<S::QuadsForSubjectPredicateGraphIterator, S>;
type PredicatesForSubjectObjectIterator =
PredicatesIterator<S::QuadsForSubjectObjectGraphIterator, S>;
type TriplesForPredicateIterator = TriplesIterator<S::QuadsForPredicateGraphIterator, S>;
type TriplesForPredicateObjectIterator =
TriplesIterator<S::QuadsForPredicateObjectGraphIterator, S>;
type SubjectsForPredicateObjectIterator =
SubjectsIterator<S::QuadsForPredicateObjectGraphIterator, S>;
type TriplesForObjectIterator = TriplesIterator<S::QuadsForObjectGraphIterator, S>;
fn triples(&self) -> Result<TriplesIterator<S::QuadsForGraphIterator, S>> {
@ -460,13 +507,13 @@ impl<S: EncodedQuadsStore> Graph for StoreDefaultGraph<S> {
store: self.store.clone(),
})
}
fn triples_for_subject_predicate(
fn objects_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<TriplesIterator<S::QuadsForSubjectPredicateGraphIterator, S>> {
) -> Result<ObjectsIterator<S::QuadsForSubjectPredicateGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
Ok(ObjectsIterator {
iter: self.store.quads_for_subject_predicate_graph(
&encoder.encode_named_or_blank_node(subject)?,
&encoder.encode_named_node(predicate)?,
@ -475,13 +522,13 @@ impl<S: EncodedQuadsStore> Graph for StoreDefaultGraph<S> {
store: self.store.clone(),
})
}
fn triples_for_subject_object(
fn predicates_for_subject_object(
&self,
subject: &NamedOrBlankNode,
object: &Term,
) -> Result<TriplesIterator<S::QuadsForSubjectObjectGraphIterator, S>> {
) -> Result<PredicatesIterator<S::QuadsForSubjectObjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
Ok(PredicatesIterator {
iter: self.store.quads_for_subject_object_graph(
&encoder.encode_named_or_blank_node(subject)?,
&encoder.encode_term(object)?,
@ -503,13 +550,13 @@ impl<S: EncodedQuadsStore> Graph for StoreDefaultGraph<S> {
store: self.store.clone(),
})
}
fn triples_for_predicate_object(
fn subjects_for_predicate_object(
&self,
predicate: &NamedNode,
object: &Term,
) -> Result<TriplesIterator<S::QuadsForPredicateObjectGraphIterator, S>> {
) -> Result<SubjectsIterator<S::QuadsForPredicateObjectGraphIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
Ok(SubjectsIterator {
iter: self.store.quads_for_predicate_object_graph(
&encoder.encode_named_node(predicate)?,
&encoder.encode_term(object)?,
@ -570,6 +617,41 @@ impl<S: EncodedQuadsStore> Graph for StoreDefaultGraph<S> {
}
}
impl<S: EncodedQuadsStore> fmt::Display for StoreDefaultGraph<S> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
for triple in self.iter()? {
write!(fmt, "{}\n", triple?)?;
}
Ok(())
}
}
impl<S: EncodedQuadsStore + Default> Default for StoreDefaultGraph<S> {
fn default() -> Self {
StoreDataset::default().default_graph()
}
}
impl<S: EncodedQuadsStore + Default> FromIterator<Triple> for StoreDefaultGraph<S> {
fn from_iter<I: IntoIterator<Item = Triple>>(iter: I) -> Self {
let graph = StoreDefaultGraph::default();
for triple in iter {
graph.insert(&triple).unwrap();
}
graph
}
}
impl<'a, S: EncodedQuadsStore + Default> FromIterator<&'a Triple> for StoreDefaultGraph<S> {
fn from_iter<I: IntoIterator<Item = &'a Triple>>(iter: I) -> Self {
let graph = StoreDefaultGraph::default();
for triple in iter {
graph.insert(triple).unwrap();
}
graph
}
}
pub struct StoreUnionGraph<S: EncodedQuadsStore> {
store: Arc<S>,
}
@ -577,11 +659,13 @@ pub struct StoreUnionGraph<S: EncodedQuadsStore> {
impl<S: EncodedQuadsStore> Graph for StoreUnionGraph<S> {
type TriplesIterator = TriplesIterator<S::QuadsIterator, S>;
type TriplesForSubjectIterator = TriplesIterator<S::QuadsForSubjectIterator, S>;
type TriplesForSubjectPredicateIterator =
TriplesIterator<S::QuadsForSubjectPredicateIterator, S>;
type TriplesForSubjectObjectIterator = TriplesIterator<S::QuadsForSubjectObjectIterator, S>;
type ObjectsForSubjectPredicateIterator =
ObjectsIterator<S::QuadsForSubjectPredicateIterator, S>;
type PredicatesForSubjectObjectIterator =
PredicatesIterator<S::QuadsForSubjectObjectIterator, S>;
type TriplesForPredicateIterator = TriplesIterator<S::QuadsForPredicateIterator, S>;
type TriplesForPredicateObjectIterator = TriplesIterator<S::QuadsForPredicateObjectIterator, S>;
type SubjectsForPredicateObjectIterator =
SubjectsIterator<S::QuadsForPredicateObjectIterator, S>;
type TriplesForObjectIterator = TriplesIterator<S::QuadsForObjectIterator, S>;
fn triples(&self) -> Result<TriplesIterator<S::QuadsIterator, S>> {
@ -603,13 +687,13 @@ impl<S: EncodedQuadsStore> Graph for StoreUnionGraph<S> {
store: self.store.clone(),
})
}
fn triples_for_subject_predicate(
fn objects_for_subject_predicate(
&self,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<TriplesIterator<S::QuadsForSubjectPredicateIterator, S>> {
) -> Result<ObjectsIterator<S::QuadsForSubjectPredicateIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
Ok(ObjectsIterator {
iter: self.store.quads_for_subject_predicate(
&encoder.encode_named_or_blank_node(subject)?,
&encoder.encode_named_node(predicate)?,
@ -617,13 +701,13 @@ impl<S: EncodedQuadsStore> Graph for StoreUnionGraph<S> {
store: self.store.clone(),
})
}
fn triples_for_subject_object(
fn predicates_for_subject_object(
&self,
subject: &NamedOrBlankNode,
object: &Term,
) -> Result<TriplesIterator<S::QuadsForSubjectObjectIterator, S>> {
) -> Result<PredicatesIterator<S::QuadsForSubjectObjectIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
Ok(PredicatesIterator {
iter: self.store.quads_for_subject_object(
&encoder.encode_named_or_blank_node(subject)?,
&encoder.encode_term(object)?,
@ -643,13 +727,13 @@ impl<S: EncodedQuadsStore> Graph for StoreUnionGraph<S> {
store: self.store.clone(),
})
}
fn triples_for_predicate_object(
fn subjects_for_predicate_object(
&self,
predicate: &NamedNode,
object: &Term,
) -> Result<TriplesIterator<S::QuadsForPredicateObjectIterator, S>> {
) -> Result<SubjectsIterator<S::QuadsForPredicateObjectIterator, S>> {
let encoder = self.store.encoder();
Ok(TriplesIterator {
Ok(SubjectsIterator {
iter: self.store.quads_for_predicate_object(
&encoder.encode_named_node(predicate)?,
&encoder.encode_term(object)?,
@ -682,11 +766,11 @@ impl<S: EncodedQuadsStore> Graph for StoreUnionGraph<S> {
}
fn insert(&self, triple: &Triple) -> Result<()> {
unimplemented!()
Err("Union graph is not writable".into())
}
fn remove(&self, triple: &Triple) -> Result<()> {
unimplemented!()
Err("Union graph is not writable".into())
}
fn len(&self) -> Result<usize> {
@ -698,6 +782,15 @@ impl<S: EncodedQuadsStore> Graph for StoreUnionGraph<S> {
}
}
impl<S: EncodedQuadsStore> fmt::Display for StoreUnionGraph<S> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
for triple in self.iter()? {
write!(fmt, "{}\n", triple?)?;
}
Ok(())
}
}
pub struct DelegatingBytesStore<'a, S: 'a + BytesStore + Sized>(&'a S);
impl<'a, S: BytesStore> BytesStore for DelegatingBytesStore<'a, S> {
@ -745,3 +838,58 @@ impl<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> Iterator
.map(|k| k.and_then(|quad| self.store.encoder().decode_triple(&quad)))
}
}
pub struct SubjectsIterator<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> {
iter: I,
store: Arc<S>,
}
impl<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> Iterator
for SubjectsIterator<I, S>
{
type Item = Result<NamedOrBlankNode>;
fn next(&mut self) -> Option<Result<NamedOrBlankNode>> {
self.iter.next().map(|k| {
k.and_then(|quad| {
self.store
.encoder()
.decode_named_or_blank_node(&quad.subject)
})
})
}
}
pub struct PredicatesIterator<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> {
iter: I,
store: Arc<S>,
}
impl<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> Iterator
for PredicatesIterator<I, S>
{
type Item = Result<NamedNode>;
fn next(&mut self) -> Option<Result<NamedNode>> {
self.iter
.next()
.map(|k| k.and_then(|quad| self.store.encoder().decode_named_node(&quad.predicate)))
}
}
pub struct ObjectsIterator<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> {
iter: I,
store: Arc<S>,
}
impl<I: Iterator<Item = Result<EncodedQuad>>, S: EncodedQuadsStore> Iterator
for ObjectsIterator<I, S>
{
type Item = Result<Term>;
fn next(&mut self) -> Option<Result<Term>> {
self.iter
.next()
.map(|k| k.and_then(|quad| self.store.encoder().decode_term(&quad.object)))
}
}

@ -17,7 +17,8 @@ use rudf::rio::turtle::read_turtle;
use rudf::sparql::algebra::Query;
use rudf::sparql::parser::read_sparql_query;
use rudf::store::isomorphism::GraphIsomorphism;
use rudf::store::memory::MemoryGraph;
use rudf::store::Graph;
use rudf::store::MemoryGraph;
use std::error::Error;
use std::fmt;
use url::Url;
@ -70,7 +71,7 @@ fn turtle_w3c_testsuite() {
match client.load_turtle(test.action.clone()) {
Ok(action_graph) => match client.load_turtle(test.result.clone().unwrap()) {
Ok(result_graph) => assert!(
action_graph.is_isomorphic(&result_graph),
action_graph.is_isomorphic(&result_graph).unwrap(),
"Failure on {}. Expected file:\n{}\nParsed file:\n{}\n",
test,
action_graph,
@ -95,7 +96,10 @@ fn turtle_w3c_testsuite() {
.unwrap_or_else(|| Ok(MemoryGraph::default()));
assert!(
action_graph.is_err()
|| !action_graph.unwrap().is_isomorphic(&result_graph.unwrap()),
|| !action_graph
.unwrap()
.is_isomorphic(&result_graph.unwrap())
.unwrap(),
"Failure on {}",
test
);
@ -276,6 +280,7 @@ impl<'a> Iterator for TestManifest<'a> {
let kind = match self
.graph
.object_for_subject_predicate(&test_subject, &rdf::TYPE)
.unwrap()
{
Some(Term::NamedNode(c)) => match c.value().split("#").last() {
Some(k) => k.to_string(),
@ -286,6 +291,7 @@ impl<'a> Iterator for TestManifest<'a> {
let name = match self
.graph
.object_for_subject_predicate(&test_subject, &mf::NAME)
.unwrap()
{
Some(Term::Literal(c)) => Some(c.value().to_string()),
_ => None,
@ -293,6 +299,7 @@ impl<'a> Iterator for TestManifest<'a> {
let comment = match self
.graph
.object_for_subject_predicate(&test_subject, &rdfs::COMMENT)
.unwrap()
{
Some(Term::Literal(c)) => Some(c.value().to_string()),
_ => None,
@ -300,6 +307,7 @@ impl<'a> Iterator for TestManifest<'a> {
let action = match self
.graph
.object_for_subject_predicate(&test_subject, &*mf::ACTION)
.unwrap()
{
Some(Term::NamedNode(n)) => n.url().clone(),
Some(_) => return Some(Err("invalid action".into())),
@ -308,6 +316,7 @@ impl<'a> Iterator for TestManifest<'a> {
let result = match self
.graph
.object_for_subject_predicate(&test_subject, &*mf::RESULT)
.unwrap()
{
Some(Term::NamedNode(n)) => Some(n.url().clone()),
Some(_) => return Some(Err("invalid result".into())),
@ -328,7 +337,10 @@ impl<'a> Iterator for TestManifest<'a> {
Some(url) => {
let manifest = NamedOrBlankNode::from(NamedNode::new(url.clone()));
match self.client.load_turtle(url) {
Ok(g) => self.graph.extend(g.into_iter()),
Ok(g) => g
.iter()
.unwrap()
.for_each(|g| self.graph.insert(&g.unwrap()).unwrap()),
Err(e) => return Some(Err(e.into())),
}
@ -336,11 +348,11 @@ impl<'a> Iterator for TestManifest<'a> {
match self
.graph
.object_for_subject_predicate(&manifest, &*mf::INCLUDE)
.unwrap()
{
Some(Term::BlankNode(list)) => {
self.manifests_to_do.extend(
self.graph
.values_for_list(list.clone().into())
RdfListIterator::iter(&self.graph, list.clone().into())
.flat_map(|m| match m {
Term::NamedNode(nm) => Some(nm.url().clone()),
_ => None,
@ -355,12 +367,19 @@ impl<'a> Iterator for TestManifest<'a> {
match self
.graph
.object_for_subject_predicate(&manifest, &*mf::ENTRIES)
.unwrap()
{
Some(Term::BlankNode(list)) => {
self.tests_to_do
.extend(self.graph.values_for_list(list.clone().into()));
self.tests_to_do.extend(RdfListIterator::iter(
&self.graph,
list.clone().into(),
));
}
Some(term) => {
return Some(Err(
format!("Invalid tests list. Got term {}", term).into()
))
}
Some(_) => return Some(Err("invalid tests list".into())),
None => (),
}
}
@ -371,3 +390,45 @@ impl<'a> Iterator for TestManifest<'a> {
}
}
}
pub struct RdfListIterator<'a, G: 'a + Graph> {
graph: &'a G,
current_node: Option<NamedOrBlankNode>,
}
impl<'a, G: 'a + Graph> RdfListIterator<'a, G> {
fn iter(graph: &'a G, root: NamedOrBlankNode) -> RdfListIterator<'a, G> {
RdfListIterator {
graph,
current_node: Some(root),
}
}
}
impl<'a, G: 'a + Graph> Iterator for RdfListIterator<'a, G> {
type Item = Term;
fn next(&mut self) -> Option<Term> {
match self.current_node.clone() {
Some(current) => {
let result = self
.graph
.object_for_subject_predicate(&current, &rdf::FIRST)
.unwrap()?
.clone();
self.current_node = match self
.graph
.object_for_subject_predicate(&current, &rdf::REST)
.unwrap()
{
Some(Term::NamedNode(ref n)) if *n == *rdf::NIL => None,
Some(Term::NamedNode(n)) => Some(n.clone().into()),
Some(Term::BlankNode(n)) => Some(n.clone().into()),
_ => None,
};
Some(result)
}
None => None,
}
}
}

Loading…
Cancel
Save