From f3a0242d4175453ab5b0dd0d00e5ed4d762d5a34 Mon Sep 17 00:00:00 2001 From: Tpt Date: Thu, 24 Oct 2019 23:32:06 +0200 Subject: [PATCH] Fixes and simplifies RocksDB store quads lookup --- lib/src/store/rocksdb.rs | 483 +++++++++++++++++++++++---------------- 1 file changed, 285 insertions(+), 198 deletions(-) diff --git a/lib/src/store/rocksdb.rs b/lib/src/store/rocksdb.rs index ca66fad5..fea55ae3 100644 --- a/lib/src/store/rocksdb.rs +++ b/lib/src/store/rocksdb.rs @@ -245,35 +245,29 @@ impl<'a> StoreConnection for RocksDbStoreConnection<'a> { } impl<'a> RocksDbStoreConnection<'a> { - fn quads(&self) -> Result { - let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; - iter.seek_to_first(); - Ok(SPOGIndexIterator { iter }) + fn quads(&self) -> Result> + 'a> { + self.spog_quads(b"", EncodedQuadPattern::new(None, None, None, None)) } fn quads_for_subject( &self, subject: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; - iter.seek(&encode_term(subject)?); - Ok(FilteringEncodedQuadsIterator { - iter: SPOGIndexIterator { iter }, - filter: EncodedQuadPattern::new(Some(subject), None, None, None), - }) + ) -> Result> + 'a> { + self.spog_quads( + &encode_term(subject)?, + EncodedQuadPattern::new(Some(subject), None, None, None), + ) } fn quads_for_subject_predicate( &self, subject: EncodedTerm, predicate: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; - iter.seek(&encode_term_pair(subject, predicate)?); - Ok(FilteringEncodedQuadsIterator { - iter: SPOGIndexIterator { iter }, - filter: EncodedQuadPattern::new(Some(subject), Some(predicate), None, None), - }) + ) -> Result> + 'a> { + self.spog_quads( + &encode_term_pair(subject, predicate)?, + EncodedQuadPattern::new(Some(subject), Some(predicate), None, None), + ) } fn quads_for_subject_predicate_object( @@ -281,88 +275,74 @@ impl<'a> RocksDbStoreConnection<'a> { subject: EncodedTerm, predicate: EncodedTerm, object: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; - iter.seek(&encode_term_triple(subject, predicate, object)?); - Ok(FilteringEncodedQuadsIterator { - iter: SPOGIndexIterator { iter }, - filter: EncodedQuadPattern::new(Some(subject), Some(predicate), Some(object), None), - }) + ) -> Result> + 'a> { + self.spog_quads( + &encode_term_triple(subject, predicate, object)?, + EncodedQuadPattern::new(Some(subject), Some(predicate), Some(object), None), + ) } fn quads_for_subject_object( &self, subject: EncodedTerm, object: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; - iter.seek(&encode_term_pair(object, subject)?); - Ok(FilteringEncodedQuadsIterator { - iter: OSPGIndexIterator { iter }, - filter: EncodedQuadPattern::new(Some(subject), None, Some(object), None), - }) + ) -> Result> + 'a> { + self.ospg_quads( + &encode_term_pair(object, subject)?, + EncodedQuadPattern::new(Some(subject), None, Some(object), None), + ) } fn quads_for_predicate( &self, predicate: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.posg_cf)?; - iter.seek(&encode_term(predicate)?); - Ok(FilteringEncodedQuadsIterator { - iter: POSGIndexIterator { iter }, - filter: EncodedQuadPattern::new(None, Some(predicate), None, None), - }) + ) -> Result> + 'a> { + self.posg_quads( + &encode_term(predicate)?, + EncodedQuadPattern::new(None, Some(predicate), None, None), + ) } fn quads_for_predicate_object( &self, predicate: EncodedTerm, object: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; - iter.seek(&encode_term_pair(predicate, object)?); - Ok(FilteringEncodedQuadsIterator { - iter: POSGIndexIterator { iter }, - filter: EncodedQuadPattern::new(None, Some(predicate), Some(object), None), - }) + ) -> Result> + 'a> { + self.posg_quads( + &encode_term_pair(predicate, object)?, + EncodedQuadPattern::new(None, Some(predicate), Some(object), None), + ) } fn quads_for_object( &self, object: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.ospg_cf)?; - iter.seek(&encode_term(object)?); - Ok(FilteringEncodedQuadsIterator { - iter: OSPGIndexIterator { iter }, - filter: EncodedQuadPattern::new(None, None, Some(object), None), - }) + ) -> Result> + 'a> { + self.ospg_quads( + &encode_term(object)?, + EncodedQuadPattern::new(None, None, Some(object), None), + ) } fn quads_for_graph( &self, graph_name: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.gspo_cf)?; - iter.seek(&encode_term(graph_name)?); - Ok(FilteringEncodedQuadsIterator { - iter: GSPOIndexIterator { iter }, - filter: EncodedQuadPattern::new(None, None, None, Some(graph_name)), - }) + ) -> Result> + 'a> { + self.gspo_quads( + &encode_term(graph_name)?, + EncodedQuadPattern::new(None, None, None, Some(graph_name)), + ) } fn quads_for_subject_graph( &self, subject: EncodedTerm, graph_name: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.gspo_cf)?; - iter.seek(&encode_term_pair(graph_name, subject)?); - Ok(FilteringEncodedQuadsIterator { - iter: GSPOIndexIterator { iter }, - filter: EncodedQuadPattern::new(Some(subject), None, None, Some(graph_name)), - }) + ) -> Result> + 'a> { + self.gspo_quads( + &encode_term_pair(graph_name, subject)?, + EncodedQuadPattern::new(Some(subject), None, None, Some(graph_name)), + ) } fn quads_for_subject_predicate_graph( @@ -370,13 +350,11 @@ impl<'a> RocksDbStoreConnection<'a> { subject: EncodedTerm, predicate: EncodedTerm, graph_name: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?; - iter.seek(&encode_term_triple(graph_name, subject, predicate)?); - Ok(FilteringEncodedQuadsIterator { - iter: GSPOIndexIterator { iter }, - filter: EncodedQuadPattern::new(Some(subject), Some(predicate), None, Some(graph_name)), - }) + ) -> Result> + 'a> { + self.gspo_quads( + &encode_term_triple(graph_name, subject, predicate)?, + EncodedQuadPattern::new(Some(subject), Some(predicate), None, Some(graph_name)), + ) } fn quads_for_subject_object_graph( @@ -384,26 +362,22 @@ impl<'a> RocksDbStoreConnection<'a> { subject: EncodedTerm, object: EncodedTerm, graph_name: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.gosp_cf)?; - iter.seek(&encode_term_triple(graph_name, object, subject)?); - Ok(FilteringEncodedQuadsIterator { - iter: GOSPIndexIterator { iter }, - filter: EncodedQuadPattern::new(Some(subject), None, Some(object), Some(graph_name)), - }) + ) -> Result> + 'a> { + self.gosp_quads( + &encode_term_triple(graph_name, object, subject)?, + EncodedQuadPattern::new(Some(subject), None, Some(object), Some(graph_name)), + ) } fn quads_for_predicate_graph( &self, predicate: EncodedTerm, graph_name: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.gpos_cf)?; - iter.seek(&encode_term_pair(graph_name, predicate)?); - Ok(FilteringEncodedQuadsIterator { - iter: GPOSIndexIterator { iter }, - filter: EncodedQuadPattern::new(None, Some(predicate), None, Some(graph_name)), - }) + ) -> Result> + 'a> { + self.gpos_quads( + &encode_term_pair(graph_name, predicate)?, + EncodedQuadPattern::new(None, Some(predicate), None, Some(graph_name)), + ) } fn quads_for_predicate_object_graph( @@ -411,25 +385,114 @@ impl<'a> RocksDbStoreConnection<'a> { predicate: EncodedTerm, object: EncodedTerm, graph_name: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.gosp_cf)?; - iter.seek(&encode_term_triple(graph_name, predicate, object)?); - Ok(FilteringEncodedQuadsIterator { - iter: GPOSIndexIterator { iter }, - filter: EncodedQuadPattern::new(None, Some(predicate), Some(object), Some(graph_name)), - }) + ) -> Result> + 'a> { + self.gpos_quads( + &encode_term_triple(graph_name, predicate, object)?, + EncodedQuadPattern::new(None, Some(predicate), Some(object), Some(graph_name)), + ) } fn quads_for_object_graph( &self, object: EncodedTerm, graph_name: EncodedTerm, - ) -> Result> { - let mut iter = self.store.db.raw_iterator_cf(self.gosp_cf)?; - iter.seek(&encode_term_pair(graph_name, object)?); + ) -> Result> + 'a> { + self.gosp_quads( + &encode_term_pair(graph_name, object)?, + EncodedQuadPattern::new(None, None, Some(object), Some(graph_name)), + ) + } + + fn spog_quads( + &self, + start: &[u8], + filter: EncodedQuadPattern, + ) -> Result> + 'a> { + self.inner_quads( + self.spog_cf, + &start, + |buffer| Cursor::new(buffer).read_spog_quad(), + filter, + ) + } + + fn posg_quads( + &self, + start: &[u8], + filter: EncodedQuadPattern, + ) -> Result> + 'a> { + self.inner_quads( + self.posg_cf, + &start, + |buffer| Cursor::new(buffer).read_posg_quad(), + filter, + ) + } + + fn ospg_quads( + &self, + start: &[u8], + filter: EncodedQuadPattern, + ) -> Result> + 'a> { + self.inner_quads( + self.ospg_cf, + &start, + |buffer| Cursor::new(buffer).read_ospg_quad(), + filter, + ) + } + + fn gspo_quads( + &self, + start: &[u8], + filter: EncodedQuadPattern, + ) -> Result> + 'a> { + self.inner_quads( + self.gspo_cf, + &start, + |buffer| Cursor::new(buffer).read_gspo_quad(), + filter, + ) + } + + fn gpos_quads( + &self, + start: &[u8], + filter: EncodedQuadPattern, + ) -> Result> + 'a> { + self.inner_quads( + self.gpos_cf, + &start, + |buffer| Cursor::new(buffer).read_gpos_quad(), + filter, + ) + } + + fn gosp_quads( + &self, + start: &[u8], + filter: EncodedQuadPattern, + ) -> Result> + 'a> { + self.inner_quads( + self.gosp_cf, + &start, + |buffer| Cursor::new(buffer).read_gosp_quad(), + filter, + ) + } + + fn inner_quads( + &self, + cf: ColumnFamily, + start: &[u8], + decode: impl Fn(&[u8]) -> Result + 'a, + filter: EncodedQuadPattern, + ) -> Result> + 'a> { + let mut iter = self.store.db.raw_iterator_cf(cf)?; + iter.seek(&start); Ok(FilteringEncodedQuadsIterator { - iter: GOSPIndexIterator { iter }, - filter: EncodedQuadPattern::new(None, None, Some(object), Some(graph_name)), + iter: DecodingIndexIterator { iter, decode }, + filter, }) } } @@ -617,110 +680,21 @@ fn encode_term_triple(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm) -> Resu Ok(vec) } -struct GSPOIndexIterator<'a> { +struct DecodingIndexIterator<'a, F: Fn(&[u8]) -> Result> { iter: DBRawIterator<'a>, + decode: F, } -impl<'a> Iterator for GSPOIndexIterator<'a> { +impl<'a, F: Fn(&[u8]) -> Result> Iterator for DecodingIndexIterator<'a, F> { type Item = Result; fn next(&mut self) -> Option> { - self.iter.next(); - unsafe { - //This is safe because we are not keeping the buffer - self.iter - .key_inner() - .map(|buffer| Cursor::new(buffer).read_gspo_quad()) - } - } -} - -struct GPOSIndexIterator<'a> { - iter: DBRawIterator<'a>, -} - -impl<'a> Iterator for GPOSIndexIterator<'a> { - type Item = Result; - - fn next(&mut self) -> Option> { - self.iter.next(); - unsafe { - //This is safe because we are not keeping the buffer - self.iter - .key_inner() - .map(|buffer| Cursor::new(buffer).read_gpos_quad()) - } - } -} - -struct GOSPIndexIterator<'a> { - iter: DBRawIterator<'a>, -} - -impl<'a> Iterator for GOSPIndexIterator<'a> { - type Item = Result; - - fn next(&mut self) -> Option> { - self.iter.next(); - unsafe { - //This is safe because we are not keeping the buffer - self.iter - .key_inner() - .map(|buffer| Cursor::new(buffer).read_gosp_quad()) - } - } -} - -struct SPOGIndexIterator<'a> { - iter: DBRawIterator<'a>, -} - -impl<'a> Iterator for SPOGIndexIterator<'a> { - type Item = Result; - - fn next(&mut self) -> Option> { - self.iter.next(); - unsafe { - //This is safe because we are not keeping the buffer - self.iter - .key_inner() - .map(|buffer| Cursor::new(buffer).read_spog_quad()) - } - } -} - -struct POSGIndexIterator<'a> { - iter: DBRawIterator<'a>, -} - -impl<'a> Iterator for POSGIndexIterator<'a> { - type Item = Result; - - fn next(&mut self) -> Option> { - self.iter.next(); - unsafe { - //This is safe because we are not keeping the buffer - self.iter - .key_inner() - .map(|buffer| Cursor::new(buffer).read_posg_quad()) - } - } -} - -struct OSPGIndexIterator<'a> { - iter: DBRawIterator<'a>, -} - -impl<'a> Iterator for OSPGIndexIterator<'a> { - type Item = Result; - - fn next(&mut self) -> Option> { - self.iter.next(); - unsafe { - //This is safe because we are not keeping the buffer - self.iter - .key_inner() - .map(|buffer| Cursor::new(buffer).read_ospg_quad()) + if self.iter.valid() { + let result = unsafe { self.iter.key_inner().map(|buffer| (self.decode)(buffer)) }; + self.iter.next(); + result + } else { + None } } } @@ -764,3 +738,116 @@ impl From for String { val.deref().to_owned() } } + +#[test] +fn repository() -> Result<()> { + use crate::model::*; + use crate::repository::RepositoryConnection; + use rand::random; + use std::env::temp_dir; + use std::fs::remove_dir_all; + + let main_s = NamedOrBlankNode::from(BlankNode::default()); + let main_p = NamedNode::parse("http://example.com")?; + let main_o = Term::from(Literal::from(1)); + + let main_quad = Quad::new(main_s.clone(), main_p.clone(), main_o.clone(), None); + + let mut repo_path = temp_dir(); + repo_path.push(random::().to_string()); + + { + let repository = RocksDbRepository::open(&repo_path)?; + let mut connection = repository.connection()?; + connection.insert(&main_quad)?; + + let target = vec![main_quad]; + assert_eq!( + connection + .quads_for_pattern(None, None, None, None) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(Some(&main_s), None, None, None) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(Some(&main_s), Some(&main_p), None, None) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(Some(&main_s), Some(&main_p), Some(&main_o), None) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(Some(&main_s), Some(&main_p), Some(&main_o), Some(None)) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(Some(&main_s), Some(&main_p), None, Some(None)) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(Some(&main_s), None, Some(&main_o), None) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(Some(&main_s), None, Some(&main_o), Some(None)) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(Some(&main_s), None, None, Some(None)) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(None, Some(&main_p), None, None) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(None, Some(&main_p), Some(&main_o), None) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(None, None, Some(&main_o), None) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(None, None, None, Some(None)) + .collect::>>()?, + target + ); + assert_eq!( + connection + .quads_for_pattern(None, Some(&main_p), Some(&main_o), Some(None)) + .collect::>>()?, + target + ); + } + + remove_dir_all(&repo_path)?; + Ok(()) +}