Fixes and simplifies RocksDB store quads lookup

pull/21/head
Tpt 5 years ago
parent 0f3208d8fa
commit f3a0242d41
  1. 483
      lib/src/store/rocksdb.rs

@ -245,35 +245,29 @@ impl<'a> StoreConnection for RocksDbStoreConnection<'a> {
} }
impl<'a> RocksDbStoreConnection<'a> { impl<'a> RocksDbStoreConnection<'a> {
fn quads(&self) -> Result<SPOGIndexIterator> { fn quads(&self) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; self.spog_quads(b"", EncodedQuadPattern::new(None, None, None, None))
iter.seek_to_first();
Ok(SPOGIndexIterator { iter })
} }
fn quads_for_subject( fn quads_for_subject(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; self.spog_quads(
iter.seek(&encode_term(subject)?); &encode_term(subject)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(Some(subject), None, None, None),
iter: SPOGIndexIterator { iter }, )
filter: EncodedQuadPattern::new(Some(subject), None, None, None),
})
} }
fn quads_for_subject_predicate( fn quads_for_subject_predicate(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; self.spog_quads(
iter.seek(&encode_term_pair(subject, predicate)?); &encode_term_pair(subject, predicate)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(Some(subject), Some(predicate), None, None),
iter: SPOGIndexIterator { iter }, )
filter: EncodedQuadPattern::new(Some(subject), Some(predicate), None, None),
})
} }
fn quads_for_subject_predicate_object( fn quads_for_subject_predicate_object(
@ -281,88 +275,74 @@ impl<'a> RocksDbStoreConnection<'a> {
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; self.spog_quads(
iter.seek(&encode_term_triple(subject, predicate, object)?); &encode_term_triple(subject, predicate, object)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(Some(subject), Some(predicate), Some(object), None),
iter: SPOGIndexIterator { iter }, )
filter: EncodedQuadPattern::new(Some(subject), Some(predicate), Some(object), None),
})
} }
fn quads_for_subject_object( fn quads_for_subject_object(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; self.ospg_quads(
iter.seek(&encode_term_pair(object, subject)?); &encode_term_pair(object, subject)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(Some(subject), None, Some(object), None),
iter: OSPGIndexIterator { iter }, )
filter: EncodedQuadPattern::new(Some(subject), None, Some(object), None),
})
} }
fn quads_for_predicate( fn quads_for_predicate(
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.posg_cf)?; self.posg_quads(
iter.seek(&encode_term(predicate)?); &encode_term(predicate)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(None, Some(predicate), None, None),
iter: POSGIndexIterator { iter }, )
filter: EncodedQuadPattern::new(None, Some(predicate), None, None),
})
} }
fn quads_for_predicate_object( fn quads_for_predicate_object(
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; self.posg_quads(
iter.seek(&encode_term_pair(predicate, object)?); &encode_term_pair(predicate, object)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(None, Some(predicate), Some(object), None),
iter: POSGIndexIterator { iter }, )
filter: EncodedQuadPattern::new(None, Some(predicate), Some(object), None),
})
} }
fn quads_for_object( fn quads_for_object(
&self, &self,
object: EncodedTerm, object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.ospg_cf)?; self.ospg_quads(
iter.seek(&encode_term(object)?); &encode_term(object)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(None, None, Some(object), None),
iter: OSPGIndexIterator { iter }, )
filter: EncodedQuadPattern::new(None, None, Some(object), None),
})
} }
fn quads_for_graph( fn quads_for_graph(
&self, &self,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<GSPOIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.gspo_cf)?; self.gspo_quads(
iter.seek(&encode_term(graph_name)?); &encode_term(graph_name)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(None, None, None, Some(graph_name)),
iter: GSPOIndexIterator { iter }, )
filter: EncodedQuadPattern::new(None, None, None, Some(graph_name)),
})
} }
fn quads_for_subject_graph( fn quads_for_subject_graph(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<GSPOIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.gspo_cf)?; self.gspo_quads(
iter.seek(&encode_term_pair(graph_name, subject)?); &encode_term_pair(graph_name, subject)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(Some(subject), None, None, Some(graph_name)),
iter: GSPOIndexIterator { iter }, )
filter: EncodedQuadPattern::new(Some(subject), None, None, Some(graph_name)),
})
} }
fn quads_for_subject_predicate_graph( fn quads_for_subject_predicate_graph(
@ -370,13 +350,11 @@ impl<'a> RocksDbStoreConnection<'a> {
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<GSPOIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; self.gspo_quads(
iter.seek(&encode_term_triple(graph_name, subject, predicate)?); &encode_term_triple(graph_name, subject, predicate)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(Some(subject), Some(predicate), None, Some(graph_name)),
iter: GSPOIndexIterator { iter }, )
filter: EncodedQuadPattern::new(Some(subject), Some(predicate), None, Some(graph_name)),
})
} }
fn quads_for_subject_object_graph( fn quads_for_subject_object_graph(
@ -384,26 +362,22 @@ impl<'a> RocksDbStoreConnection<'a> {
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<GOSPIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.gosp_cf)?; self.gosp_quads(
iter.seek(&encode_term_triple(graph_name, object, subject)?); &encode_term_triple(graph_name, object, subject)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(Some(subject), None, Some(object), Some(graph_name)),
iter: GOSPIndexIterator { iter }, )
filter: EncodedQuadPattern::new(Some(subject), None, Some(object), Some(graph_name)),
})
} }
fn quads_for_predicate_graph( fn quads_for_predicate_graph(
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<GPOSIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.gpos_cf)?; self.gpos_quads(
iter.seek(&encode_term_pair(graph_name, predicate)?); &encode_term_pair(graph_name, predicate)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(None, Some(predicate), None, Some(graph_name)),
iter: GPOSIndexIterator { iter }, )
filter: EncodedQuadPattern::new(None, Some(predicate), None, Some(graph_name)),
})
} }
fn quads_for_predicate_object_graph( fn quads_for_predicate_object_graph(
@ -411,25 +385,114 @@ impl<'a> RocksDbStoreConnection<'a> {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<GPOSIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.gosp_cf)?; self.gpos_quads(
iter.seek(&encode_term_triple(graph_name, predicate, object)?); &encode_term_triple(graph_name, predicate, object)?,
Ok(FilteringEncodedQuadsIterator { EncodedQuadPattern::new(None, Some(predicate), Some(object), Some(graph_name)),
iter: GPOSIndexIterator { iter }, )
filter: EncodedQuadPattern::new(None, Some(predicate), Some(object), Some(graph_name)),
})
} }
fn quads_for_object_graph( fn quads_for_object_graph(
&self, &self,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<GOSPIndexIterator>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(self.gosp_cf)?; self.gosp_quads(
iter.seek(&encode_term_pair(graph_name, object)?); &encode_term_pair(graph_name, object)?,
EncodedQuadPattern::new(None, None, Some(object), Some(graph_name)),
)
}
fn spog_quads(
&self,
start: &[u8],
filter: EncodedQuadPattern,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(
self.spog_cf,
&start,
|buffer| Cursor::new(buffer).read_spog_quad(),
filter,
)
}
fn posg_quads(
&self,
start: &[u8],
filter: EncodedQuadPattern,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(
self.posg_cf,
&start,
|buffer| Cursor::new(buffer).read_posg_quad(),
filter,
)
}
fn ospg_quads(
&self,
start: &[u8],
filter: EncodedQuadPattern,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(
self.ospg_cf,
&start,
|buffer| Cursor::new(buffer).read_ospg_quad(),
filter,
)
}
fn gspo_quads(
&self,
start: &[u8],
filter: EncodedQuadPattern,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(
self.gspo_cf,
&start,
|buffer| Cursor::new(buffer).read_gspo_quad(),
filter,
)
}
fn gpos_quads(
&self,
start: &[u8],
filter: EncodedQuadPattern,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(
self.gpos_cf,
&start,
|buffer| Cursor::new(buffer).read_gpos_quad(),
filter,
)
}
fn gosp_quads(
&self,
start: &[u8],
filter: EncodedQuadPattern,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(
self.gosp_cf,
&start,
|buffer| Cursor::new(buffer).read_gosp_quad(),
filter,
)
}
fn inner_quads(
&self,
cf: ColumnFamily,
start: &[u8],
decode: impl Fn(&[u8]) -> Result<EncodedQuad> + 'a,
filter: EncodedQuadPattern,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
let mut iter = self.store.db.raw_iterator_cf(cf)?;
iter.seek(&start);
Ok(FilteringEncodedQuadsIterator { Ok(FilteringEncodedQuadsIterator {
iter: GOSPIndexIterator { iter }, iter: DecodingIndexIterator { iter, decode },
filter: EncodedQuadPattern::new(None, None, Some(object), Some(graph_name)), filter,
}) })
} }
} }
@ -617,110 +680,21 @@ fn encode_term_triple(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm) -> Resu
Ok(vec) Ok(vec)
} }
struct GSPOIndexIterator<'a> { struct DecodingIndexIterator<'a, F: Fn(&[u8]) -> Result<EncodedQuad>> {
iter: DBRawIterator<'a>, iter: DBRawIterator<'a>,
decode: F,
} }
impl<'a> Iterator for GSPOIndexIterator<'a> { impl<'a, F: Fn(&[u8]) -> Result<EncodedQuad>> Iterator for DecodingIndexIterator<'a, F> {
type Item = Result<EncodedQuad>; type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> { fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next(); if self.iter.valid() {
unsafe { let result = unsafe { self.iter.key_inner().map(|buffer| (self.decode)(buffer)) };
//This is safe because we are not keeping the buffer self.iter.next();
self.iter result
.key_inner() } else {
.map(|buffer| Cursor::new(buffer).read_gspo_quad()) None
}
}
}
struct GPOSIndexIterator<'a> {
iter: DBRawIterator<'a>,
}
impl<'a> Iterator for GPOSIndexIterator<'a> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
unsafe {
//This is safe because we are not keeping the buffer
self.iter
.key_inner()
.map(|buffer| Cursor::new(buffer).read_gpos_quad())
}
}
}
struct GOSPIndexIterator<'a> {
iter: DBRawIterator<'a>,
}
impl<'a> Iterator for GOSPIndexIterator<'a> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
unsafe {
//This is safe because we are not keeping the buffer
self.iter
.key_inner()
.map(|buffer| Cursor::new(buffer).read_gosp_quad())
}
}
}
struct SPOGIndexIterator<'a> {
iter: DBRawIterator<'a>,
}
impl<'a> Iterator for SPOGIndexIterator<'a> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
unsafe {
//This is safe because we are not keeping the buffer
self.iter
.key_inner()
.map(|buffer| Cursor::new(buffer).read_spog_quad())
}
}
}
struct POSGIndexIterator<'a> {
iter: DBRawIterator<'a>,
}
impl<'a> Iterator for POSGIndexIterator<'a> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
unsafe {
//This is safe because we are not keeping the buffer
self.iter
.key_inner()
.map(|buffer| Cursor::new(buffer).read_posg_quad())
}
}
}
struct OSPGIndexIterator<'a> {
iter: DBRawIterator<'a>,
}
impl<'a> Iterator for OSPGIndexIterator<'a> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
unsafe {
//This is safe because we are not keeping the buffer
self.iter
.key_inner()
.map(|buffer| Cursor::new(buffer).read_ospg_quad())
} }
} }
} }
@ -764,3 +738,116 @@ impl From<RocksString> for String {
val.deref().to_owned() val.deref().to_owned()
} }
} }
#[test]
fn repository() -> Result<()> {
use crate::model::*;
use crate::repository::RepositoryConnection;
use rand::random;
use std::env::temp_dir;
use std::fs::remove_dir_all;
let main_s = NamedOrBlankNode::from(BlankNode::default());
let main_p = NamedNode::parse("http://example.com")?;
let main_o = Term::from(Literal::from(1));
let main_quad = Quad::new(main_s.clone(), main_p.clone(), main_o.clone(), None);
let mut repo_path = temp_dir();
repo_path.push(random::<u128>().to_string());
{
let repository = RocksDbRepository::open(&repo_path)?;
let mut connection = repository.connection()?;
connection.insert(&main_quad)?;
let target = vec![main_quad];
assert_eq!(
connection
.quads_for_pattern(None, None, None, None)
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(Some(&main_s), None, None, None)
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(Some(&main_s), Some(&main_p), None, None)
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(Some(&main_s), Some(&main_p), Some(&main_o), None)
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(Some(&main_s), Some(&main_p), Some(&main_o), Some(None))
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(Some(&main_s), Some(&main_p), None, Some(None))
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(Some(&main_s), None, Some(&main_o), None)
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(Some(&main_s), None, Some(&main_o), Some(None))
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(Some(&main_s), None, None, Some(None))
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(None, Some(&main_p), None, None)
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(None, Some(&main_p), Some(&main_o), None)
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(None, None, Some(&main_o), None)
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(None, None, None, Some(None))
.collect::<Result<Vec<_>>>()?,
target
);
assert_eq!(
connection
.quads_for_pattern(None, Some(&main_p), Some(&main_o), Some(None))
.collect::<Result<Vec<_>>>()?,
target
);
}
remove_dir_all(&repo_path)?;
Ok(())
}

Loading…
Cancel
Save