Adds gspo, gpos and gosp indexes to RocksDB store

Adds also some optimization to reduce the number of allocations
pull/10/head
Tpt 5 years ago
parent b0988aa4b9
commit dbf9e6899e
  1. 2
      lib/src/sparql/plan_builder.rs
  2. 132
      lib/src/store/numeric_encoder.rs
  3. 260
      lib/src/store/rocksdb.rs

@ -638,7 +638,7 @@ impl<S: StringStore> PlanBuilder<S> {
"string", "string",
)? )?
} else { } else {
Err(format_err!("Not supported custom function {}", expression))? return Err(format_err!("Not supported custom function {}", expression));
} }
} }
}, },

@ -14,6 +14,7 @@ use rust_decimal::Decimal;
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use std::io::Read; use std::io::Read;
use std::io::Write; use std::io::Write;
use std::mem::size_of;
use std::ops::Deref; use std::ops::Deref;
use std::str; use std::str;
use std::sync::PoisonError; use std::sync::PoisonError;
@ -65,10 +66,6 @@ pub trait StringStore {
impl<'a, S: StringStore> StringStore for &'a S { impl<'a, S: StringStore> StringStore for &'a S {
type StringType = S::StringType; type StringType = S::StringType;
fn insert_str(&self, value: &str) -> Result<u64> {
(*self).insert_str(value)
}
fn get_str(&self, id: u64) -> Result<Option<S::StringType>> { fn get_str(&self, id: u64) -> Result<Option<S::StringType>> {
(*self).get_str(id) (*self).get_str(id)
} }
@ -76,6 +73,10 @@ impl<'a, S: StringStore> StringStore for &'a S {
fn get_str_id(&self, value: &str) -> Result<Option<u64>> { fn get_str_id(&self, value: &str) -> Result<Option<u64>> {
(*self).get_str_id(value) (*self).get_str_id(value)
} }
fn insert_str(&self, value: &str) -> Result<u64> {
(*self).insert_str(value)
}
} }
pub struct MemoryStringStore { pub struct MemoryStringStore {
@ -97,19 +98,6 @@ impl Default for MemoryStringStore {
impl StringStore for MemoryStringStore { impl StringStore for MemoryStringStore {
type StringType = String; type StringType = String;
fn insert_str(&self, value: &str) -> Result<u64> {
let mut id2str = self.id2str.write().map_err(MutexPoisonError::from)?;
let mut str2id = self.str2id.write().map_err(MutexPoisonError::from)?;
Ok(if let Some(id) = str2id.get(value) {
*id
} else {
let id = id2str.len() as u64;
id2str.push(value.to_string());
str2id.insert(value.to_string(), id);
id
})
}
fn get_str(&self, id: u64) -> Result<Option<String>> { fn get_str(&self, id: u64) -> Result<Option<String>> {
//TODO: avoid copy by adding a lifetime limit to get_str //TODO: avoid copy by adding a lifetime limit to get_str
let id2str = self.id2str.read().map_err(MutexPoisonError::from)?; let id2str = self.id2str.read().map_err(MutexPoisonError::from)?;
@ -124,6 +112,19 @@ impl StringStore for MemoryStringStore {
let str2id = self.str2id.read().map_err(MutexPoisonError::from)?; let str2id = self.str2id.read().map_err(MutexPoisonError::from)?;
Ok(str2id.get(value).cloned()) Ok(str2id.get(value).cloned())
} }
fn insert_str(&self, value: &str) -> Result<u64> {
let mut id2str = self.id2str.write().map_err(MutexPoisonError::from)?;
let mut str2id = self.str2id.write().map_err(MutexPoisonError::from)?;
Ok(if let Some(id) = str2id.get(value) {
*id
} else {
let id = id2str.len() as u64;
id2str.push(value.to_string());
str2id.insert(value.to_string(), id);
id
})
}
} }
const TYPE_DEFAULT_GRAPH_ID: u8 = 0; const TYPE_DEFAULT_GRAPH_ID: u8 = 0;
@ -144,38 +145,38 @@ const TYPE_DATE_LITERAL: u8 = 15;
const TYPE_NAIVE_DATE_LITERAL: u8 = 16; const TYPE_NAIVE_DATE_LITERAL: u8 = 16;
const TYPE_NAIVE_TIME_LITERAL: u8 = 17; const TYPE_NAIVE_TIME_LITERAL: u8 = 17;
pub static ENCODED_DEFAULT_GRAPH: EncodedTerm = EncodedTerm::DefaultGraph; pub const ENCODED_DEFAULT_GRAPH: EncodedTerm = EncodedTerm::DefaultGraph;
pub static ENCODED_EMPTY_STRING_LITERAL: EncodedTerm = EncodedTerm::StringLiteral { pub const ENCODED_EMPTY_STRING_LITERAL: EncodedTerm = EncodedTerm::StringLiteral {
value_id: EMPTY_STRING_ID, value_id: EMPTY_STRING_ID,
}; };
pub static ENCODED_RDF_LANG_STRING_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode { pub const ENCODED_RDF_LANG_STRING_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode {
iri_id: RDF_LANG_STRING_ID, iri_id: RDF_LANG_STRING_ID,
}; };
pub static ENCODED_XSD_STRING_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode { pub const ENCODED_XSD_STRING_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode {
iri_id: XSD_STRING_ID, iri_id: XSD_STRING_ID,
}; };
pub static ENCODED_XSD_BOOLEAN_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode { pub const ENCODED_XSD_BOOLEAN_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode {
iri_id: XSD_BOOLEAN_ID, iri_id: XSD_BOOLEAN_ID,
}; };
pub static ENCODED_XSD_FLOAT_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode { pub const ENCODED_XSD_FLOAT_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode {
iri_id: XSD_FLOAT_ID, iri_id: XSD_FLOAT_ID,
}; };
pub static ENCODED_XSD_DOUBLE_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode { pub const ENCODED_XSD_DOUBLE_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode {
iri_id: XSD_DOUBLE_ID, iri_id: XSD_DOUBLE_ID,
}; };
pub static ENCODED_XSD_INTEGER_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode { pub const ENCODED_XSD_INTEGER_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode {
iri_id: XSD_INTEGER_ID, iri_id: XSD_INTEGER_ID,
}; };
pub static ENCODED_XSD_DECIMAL_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode { pub const ENCODED_XSD_DECIMAL_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode {
iri_id: XSD_DECIMAL_ID, iri_id: XSD_DECIMAL_ID,
}; };
pub static ENCODED_XSD_DATE_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode { pub const ENCODED_XSD_DATE_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode {
iri_id: XSD_DATE_ID, iri_id: XSD_DATE_ID,
}; };
pub static ENCODED_XSD_TIME_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode { pub const ENCODED_XSD_TIME_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode {
iri_id: XSD_TIME_ID, iri_id: XSD_TIME_ID,
}; };
pub static ENCODED_XSD_DATE_TIME_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode { pub const ENCODED_XSD_DATE_TIME_NAMED_NODE: EncodedTerm = EncodedTerm::NamedNode {
iri_id: XSD_DATE_TIME_ID, iri_id: XSD_DATE_TIME_ID,
}; };
@ -397,6 +398,9 @@ pub trait TermReader {
fn read_spog_quad(&mut self) -> Result<EncodedQuad>; fn read_spog_quad(&mut self) -> Result<EncodedQuad>;
fn read_posg_quad(&mut self) -> Result<EncodedQuad>; fn read_posg_quad(&mut self) -> Result<EncodedQuad>;
fn read_ospg_quad(&mut self) -> Result<EncodedQuad>; fn read_ospg_quad(&mut self) -> Result<EncodedQuad>;
fn read_gspo_quad(&mut self) -> Result<EncodedQuad>;
fn read_gpos_quad(&mut self) -> Result<EncodedQuad>;
fn read_gosp_quad(&mut self) -> Result<EncodedQuad>;
} }
impl<R: Read> TermReader for R { impl<R: Read> TermReader for R {
@ -513,16 +517,60 @@ impl<R: Read> TermReader for R {
graph_name, graph_name,
}) })
} }
fn read_gspo_quad(&mut self) -> Result<EncodedQuad> {
let graph_name = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_gpos_quad(&mut self) -> Result<EncodedQuad> {
let graph_name = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_gosp_quad(&mut self) -> Result<EncodedQuad> {
let graph_name = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
} }
pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<u64>();
pub trait TermWriter { pub trait TermWriter {
fn write_term(&mut self, term: EncodedTerm) -> Result<()>; fn write_term(&mut self, term: EncodedTerm) -> Result<()>;
fn write_spog_quad(&mut self, quad: &EncodedQuad) -> Result<()>; fn write_spog_quad(&mut self, quad: &EncodedQuad) -> Result<()>;
fn write_posg_quad(&mut self, quad: &EncodedQuad) -> Result<()>; fn write_posg_quad(&mut self, quad: &EncodedQuad) -> Result<()>;
fn write_ospg_quad(&mut self, quad: &EncodedQuad) -> Result<()>; fn write_ospg_quad(&mut self, quad: &EncodedQuad) -> Result<()>;
fn write_gspo_quad(&mut self, quad: &EncodedQuad) -> Result<()>;
fn write_gpos_quad(&mut self, quad: &EncodedQuad) -> Result<()>;
fn write_gosp_quad(&mut self, quad: &EncodedQuad) -> Result<()>;
} }
impl<R: Write> TermWriter for R { impl<W: Write> TermWriter for W {
fn write_term(&mut self, term: EncodedTerm) -> Result<()> { fn write_term(&mut self, term: EncodedTerm) -> Result<()> {
self.write_u8(term.type_id())?; self.write_u8(term.type_id())?;
match term { match term {
@ -598,6 +646,30 @@ impl<R: Write> TermWriter for R {
self.write_term(quad.graph_name)?; self.write_term(quad.graph_name)?;
Ok(()) Ok(())
} }
fn write_gspo_quad(&mut self, quad: &EncodedQuad) -> Result<()> {
self.write_term(quad.graph_name)?;
self.write_term(quad.subject)?;
self.write_term(quad.predicate)?;
self.write_term(quad.object)?;
Ok(())
}
fn write_gpos_quad(&mut self, quad: &EncodedQuad) -> Result<()> {
self.write_term(quad.graph_name)?;
self.write_term(quad.predicate)?;
self.write_term(quad.object)?;
self.write_term(quad.subject)?;
Ok(())
}
fn write_gosp_quad(&mut self, quad: &EncodedQuad) -> Result<()> {
self.write_term(quad.graph_name)?;
self.write_term(quad.object)?;
self.write_term(quad.subject)?;
self.write_term(quad.predicate)?;
Ok(())
}
} }
pub struct Encoder<S: StringStore> { pub struct Encoder<S: StringStore> {

@ -59,6 +59,9 @@ const STR2ID_CF: &str = "id2str";
const SPOG_CF: &str = "spog"; const SPOG_CF: &str = "spog";
const POSG_CF: &str = "posg"; const POSG_CF: &str = "posg";
const OSPG_CF: &str = "ospg"; const OSPG_CF: &str = "ospg";
const GSPO_CF: &str = "gspo";
const GPOS_CF: &str = "gpos";
const GOSP_CF: &str = "gosp";
const EMPTY_BUF: [u8; 0] = [0 as u8; 0]; const EMPTY_BUF: [u8; 0] = [0 as u8; 0];
@ -74,11 +77,15 @@ struct RocksDbStore {
#[derive(Clone)] #[derive(Clone)]
pub struct RocksDbStoreConnection<'a> { pub struct RocksDbStoreConnection<'a> {
store: &'a RocksDbStore, store: &'a RocksDbStore,
buffer: Vec<u8>,
id2str_cf: ColumnFamily<'a>, id2str_cf: ColumnFamily<'a>,
str2id_cf: ColumnFamily<'a>, str2id_cf: ColumnFamily<'a>,
spog_cf: ColumnFamily<'a>, spog_cf: ColumnFamily<'a>,
posg_cf: ColumnFamily<'a>, posg_cf: ColumnFamily<'a>,
ospg_cf: ColumnFamily<'a>, ospg_cf: ColumnFamily<'a>,
gspo_cf: ColumnFamily<'a>,
gpos_cf: ColumnFamily<'a>,
gosp_cf: ColumnFamily<'a>,
} }
impl RocksDbRepository { impl RocksDbRepository {
@ -119,11 +126,15 @@ impl<'a> Store for &'a RocksDbStore {
fn connection(self) -> Result<RocksDbStoreConnection<'a>> { fn connection(self) -> Result<RocksDbStoreConnection<'a>> {
Ok(RocksDbStoreConnection { Ok(RocksDbStoreConnection {
store: self, store: self,
buffer: Vec::default(),
id2str_cf: get_cf(&self.db, ID2STR_CF)?, id2str_cf: get_cf(&self.db, ID2STR_CF)?,
str2id_cf: get_cf(&self.db, STR2ID_CF)?, str2id_cf: get_cf(&self.db, STR2ID_CF)?,
spog_cf: get_cf(&self.db, SPOG_CF)?, spog_cf: get_cf(&self.db, SPOG_CF)?,
posg_cf: get_cf(&self.db, POSG_CF)?, posg_cf: get_cf(&self.db, POSG_CF)?,
ospg_cf: get_cf(&self.db, OSPG_CF)?, ospg_cf: get_cf(&self.db, OSPG_CF)?,
gspo_cf: get_cf(&self.db, GSPO_CF)?,
gpos_cf: get_cf(&self.db, GPOS_CF)?,
gosp_cf: get_cf(&self.db, GOSP_CF)?,
}) })
} }
} }
@ -131,6 +142,22 @@ impl<'a> Store for &'a RocksDbStore {
impl StringStore for RocksDbStoreConnection<'_> { impl StringStore for RocksDbStoreConnection<'_> {
type StringType = RocksString; type StringType = RocksString;
fn get_str(&self, id: u64) -> Result<Option<RocksString>> {
Ok(self
.store
.db
.get_cf(self.id2str_cf, &to_bytes(id))?
.map(|v| RocksString { vec: v }))
}
fn get_str_id(&self, value: &str) -> Result<Option<u64>> {
Ok(self
.store
.db
.get_cf(self.str2id_cf, value.as_bytes())?
.map(|id| LittleEndian::read_u64(&id)))
}
fn insert_str(&self, value: &str) -> Result<u64> { fn insert_str(&self, value: &str) -> Result<u64> {
Ok(if let Some(id) = self.get_str_id(value)? { Ok(if let Some(id) = self.get_str_id(value)? {
id id
@ -149,47 +176,73 @@ impl StringStore for RocksDbStoreConnection<'_> {
id id
}) })
} }
fn get_str(&self, id: u64) -> Result<Option<RocksString>> {
Ok(self
.store
.db
.get_cf(self.id2str_cf, &to_bytes(id))?
.map(|v| RocksString { vec: v }))
}
fn get_str_id(&self, value: &str) -> Result<Option<u64>> {
Ok(self
.store
.db
.get_cf(self.str2id_cf, value.as_bytes())?
.map(|id| LittleEndian::read_u64(&id)))
}
} }
impl<'a> StoreConnection for RocksDbStoreConnection<'a> { impl<'a> StoreConnection for RocksDbStoreConnection<'a> {
fn contains(&self, quad: &EncodedQuad) -> Result<bool> { fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
.store buffer.write_spog_quad(quad)?;
.db Ok(self.store.db.get_cf(self.spog_cf, &buffer)?.is_some())
.get_cf(self.spog_cf, &encode_spog_quad(quad)?)?
.is_some())
} }
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
batch.put_cf(self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?;
batch.put_cf(self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?; self.buffer.write_spog_quad(quad)?;
batch.put_cf(self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?; batch.put_cf(self.spog_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_posg_quad(quad)?;
batch.put_cf(self.posg_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_ospg_quad(quad)?;
batch.put_cf(self.ospg_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_gspo_quad(quad)?;
batch.put_cf(self.gspo_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_gpos_quad(quad)?;
batch.put_cf(self.gpos_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_gosp_quad(quad)?;
batch.put_cf(self.gosp_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.store.db.write(batch)?; //TODO: check what's going on if the key already exists self.store.db.write(batch)?; //TODO: check what's going on if the key already exists
Ok(()) Ok(())
} }
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { fn remove(&mut self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default(); let mut batch = WriteBatch::default();
batch.delete_cf(self.spog_cf, &encode_spog_quad(quad)?)?;
batch.delete_cf(self.posg_cf, &encode_posg_quad(quad)?)?; self.buffer.write_spog_quad(quad)?;
batch.delete_cf(self.ospg_cf, &encode_ospg_quad(quad)?)?; batch.delete_cf(self.spog_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_posg_quad(quad)?;
batch.delete_cf(self.posg_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_ospg_quad(quad)?;
batch.delete_cf(self.ospg_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_gspo_quad(quad)?;
batch.delete_cf(self.gspo_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_gpos_quad(quad)?;
batch.delete_cf(self.gpos_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_gosp_quad(quad)?;
batch.delete_cf(self.gosp_cf, &self.buffer)?;
self.buffer.clear();
self.store.db.write(batch)?; self.store.db.write(batch)?;
Ok(()) Ok(())
} }
@ -370,10 +423,12 @@ impl<'a> RocksDbStoreConnection<'a> {
fn quads_for_graph( fn quads_for_graph(
&self, &self,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<InGraphQuadsIterator<SPOGIndexIterator>> { ) -> Result<FilteringEncodedQuadsIterator<GSPOIndexIterator>> {
Ok(InGraphQuadsIterator { let mut iter = self.store.db.raw_iterator_cf(self.gspo_cf)?;
iter: self.quads()?, iter.seek(&encode_term(graph_name)?);
graph_name, Ok(FilteringEncodedQuadsIterator {
iter: GSPOIndexIterator { iter },
filter: EncodedQuadPattern::new(None, None, None, Some(graph_name)),
}) })
} }
@ -381,10 +436,12 @@ impl<'a> RocksDbStoreConnection<'a> {
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<SPOGIndexIterator>>> { ) -> Result<FilteringEncodedQuadsIterator<GSPOIndexIterator>> {
Ok(InGraphQuadsIterator { let mut iter = self.store.db.raw_iterator_cf(self.gspo_cf)?;
iter: self.quads_for_subject(subject)?, iter.seek(&encode_term_pair(graph_name, subject)?);
graph_name, Ok(FilteringEncodedQuadsIterator {
iter: GSPOIndexIterator { iter },
filter: EncodedQuadPattern::new(Some(subject), None, None, Some(graph_name)),
}) })
} }
@ -393,10 +450,12 @@ impl<'a> RocksDbStoreConnection<'a> {
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<SPOGIndexIterator>>> { ) -> Result<FilteringEncodedQuadsIterator<GSPOIndexIterator>> {
Ok(InGraphQuadsIterator { let mut iter = self.store.db.raw_iterator_cf(self.spog_cf)?;
iter: self.quads_for_subject_predicate(subject, predicate)?, iter.seek(&encode_term_triple(graph_name, subject, predicate)?);
graph_name, Ok(FilteringEncodedQuadsIterator {
iter: GSPOIndexIterator { iter },
filter: EncodedQuadPattern::new(Some(subject), Some(predicate), None, Some(graph_name)),
}) })
} }
@ -405,10 +464,12 @@ impl<'a> RocksDbStoreConnection<'a> {
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<OSPGIndexIterator>>> { ) -> Result<FilteringEncodedQuadsIterator<GOSPIndexIterator>> {
Ok(InGraphQuadsIterator { let mut iter = self.store.db.raw_iterator_cf(self.gosp_cf)?;
iter: self.quads_for_subject_object(subject, object)?, iter.seek(&encode_term_triple(graph_name, object, subject)?);
graph_name, Ok(FilteringEncodedQuadsIterator {
iter: GOSPIndexIterator { iter },
filter: EncodedQuadPattern::new(Some(subject), None, Some(object), Some(graph_name)),
}) })
} }
@ -416,10 +477,12 @@ impl<'a> RocksDbStoreConnection<'a> {
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<POSGIndexIterator>>> { ) -> Result<FilteringEncodedQuadsIterator<GPOSIndexIterator>> {
Ok(InGraphQuadsIterator { let mut iter = self.store.db.raw_iterator_cf(self.gpos_cf)?;
iter: self.quads_for_predicate(predicate)?, iter.seek(&encode_term_pair(graph_name, predicate)?);
graph_name, Ok(FilteringEncodedQuadsIterator {
iter: GPOSIndexIterator { iter },
filter: EncodedQuadPattern::new(None, Some(predicate), None, Some(graph_name)),
}) })
} }
@ -428,10 +491,12 @@ impl<'a> RocksDbStoreConnection<'a> {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<POSGIndexIterator>>> { ) -> Result<FilteringEncodedQuadsIterator<GPOSIndexIterator>> {
Ok(InGraphQuadsIterator { let mut iter = self.store.db.raw_iterator_cf(self.gosp_cf)?;
iter: self.quads_for_predicate_object(predicate, object)?, iter.seek(&encode_term_triple(graph_name, predicate, object)?);
graph_name, Ok(FilteringEncodedQuadsIterator {
iter: GPOSIndexIterator { iter },
filter: EncodedQuadPattern::new(None, Some(predicate), Some(object), Some(graph_name)),
}) })
} }
@ -439,10 +504,12 @@ impl<'a> RocksDbStoreConnection<'a> {
&self, &self,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<InGraphQuadsIterator<FilteringEncodedQuadsIterator<OSPGIndexIterator>>> { ) -> Result<FilteringEncodedQuadsIterator<GOSPIndexIterator>> {
Ok(InGraphQuadsIterator { let mut iter = self.store.db.raw_iterator_cf(self.gosp_cf)?;
iter: self.quads_for_object(object)?, iter.seek(&encode_term_pair(graph_name, object)?);
graph_name, Ok(FilteringEncodedQuadsIterator {
iter: GOSPIndexIterator { iter },
filter: EncodedQuadPattern::new(None, None, Some(object), Some(graph_name)),
}) })
} }
} }
@ -527,42 +594,78 @@ impl EncodedQuadPattern {
} }
fn encode_term(t: EncodedTerm) -> Result<Vec<u8>> { fn encode_term(t: EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::default(); let mut vec = Vec::with_capacity(WRITTEN_TERM_MAX_SIZE);
vec.write_term(t)?; vec.write_term(t)?;
Ok(vec) Ok(vec)
} }
fn encode_term_pair(t1: EncodedTerm, t2: EncodedTerm) -> Result<Vec<u8>> { fn encode_term_pair(t1: EncodedTerm, t2: EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::default(); let mut vec = Vec::with_capacity(2 * WRITTEN_TERM_MAX_SIZE);
vec.write_term(t1)?; vec.write_term(t1)?;
vec.write_term(t2)?; vec.write_term(t2)?;
Ok(vec) Ok(vec)
} }
fn encode_term_triple(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm) -> Result<Vec<u8>> { fn encode_term_triple(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm) -> Result<Vec<u8>> {
let mut vec = Vec::default(); let mut vec = Vec::with_capacity(3 * WRITTEN_TERM_MAX_SIZE);
vec.write_term(t1)?; vec.write_term(t1)?;
vec.write_term(t2)?; vec.write_term(t2)?;
vec.write_term(t3)?; vec.write_term(t3)?;
Ok(vec) Ok(vec)
} }
fn encode_spog_quad(quad: &EncodedQuad) -> Result<Vec<u8>> { struct GSPOIndexIterator<'a> {
let mut vec = Vec::default(); iter: DBRawIterator<'a>,
vec.write_spog_quad(quad)?;
Ok(vec)
} }
fn encode_posg_quad(quad: &EncodedQuad) -> Result<Vec<u8>> { impl<'a> Iterator for GSPOIndexIterator<'a> {
let mut vec = Vec::default(); type Item = Result<EncodedQuad>;
vec.write_posg_quad(quad)?;
Ok(vec) fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
unsafe {
//This is safe because we are not keeping the buffer
self.iter
.key_inner()
.map(|buffer| Cursor::new(buffer).read_gspo_quad())
}
}
} }
fn encode_ospg_quad(quad: &EncodedQuad) -> Result<Vec<u8>> { struct GPOSIndexIterator<'a> {
let mut vec = Vec::default(); iter: DBRawIterator<'a>,
vec.write_ospg_quad(quad)?; }
Ok(vec)
impl<'a> Iterator for GPOSIndexIterator<'a> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
unsafe {
//This is safe because we are not keeping the buffer
self.iter
.key_inner()
.map(|buffer| Cursor::new(buffer).read_gpos_quad())
}
}
}
struct GOSPIndexIterator<'a> {
iter: DBRawIterator<'a>,
}
impl<'a> Iterator for GOSPIndexIterator<'a> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
unsafe {
//This is safe because we are not keeping the buffer
self.iter
.key_inner()
.map(|buffer| Cursor::new(buffer).read_gosp_quad())
}
}
} }
struct SPOGIndexIterator<'a> { struct SPOGIndexIterator<'a> {
@ -635,23 +738,6 @@ impl<I: Iterator<Item = Result<EncodedQuad>>> Iterator for FilteringEncodedQuads
} }
} }
struct InGraphQuadsIterator<I: Iterator<Item = Result<EncodedQuad>>> {
iter: I,
graph_name: EncodedTerm,
}
impl<I: Iterator<Item = Result<EncodedQuad>>> Iterator for InGraphQuadsIterator<I> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
let graph_name = &self.graph_name;
self.iter.find(|quad| match quad {
Ok(quad) => graph_name == &quad.graph_name,
Err(_) => true,
})
}
}
fn to_bytes(int: u64) -> [u8; 8] { fn to_bytes(int: u64) -> [u8; 8] {
let mut buf = [0 as u8; 8]; let mut buf = [0 as u8; 8];
LittleEndian::write_u64(&mut buf, int); LittleEndian::write_u64(&mut buf, int);

Loading…
Cancel
Save