Avoids box in RocksStore.quads_for_pattern

pull/35/head
Tpt 5 years ago
parent fd4655b4e8
commit 8b88a7576c
  1. 26
      lib/src/store/numeric_encoder.rs
  2. 291
      lib/src/store/rocksdb.rs

@ -12,7 +12,7 @@ use siphasher::sip128::{Hasher128, SipHasher24};
use std::collections::HashMap;
use std::hash::Hash;
use std::hash::Hasher;
use std::io::Read;
use std::io::{Cursor, Read};
use std::mem::size_of;
use std::str;
@ -800,6 +800,30 @@ pub fn write_gosp_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.predicate);
}
#[derive(Clone, Copy)]
pub enum QuadEncoding {
SPOG,
POSG,
OSPG,
GSPO,
GPOS,
GOSP,
}
impl QuadEncoding {
pub fn decode(self, buffer: &[u8]) -> Result<EncodedQuad> {
let mut cursor = Cursor::new(&buffer);
match self {
QuadEncoding::SPOG => cursor.read_spog_quad(),
QuadEncoding::POSG => cursor.read_posg_quad(),
QuadEncoding::OSPG => cursor.read_ospg_quad(),
QuadEncoding::GSPO => cursor.read_gspo_quad(),
QuadEncoding::GPOS => cursor.read_gpos_quad(),
QuadEncoding::GOSP => cursor.read_gosp_quad(),
}
}
}
pub trait StrLookup {
fn get_str(&self, id: StrHash) -> Result<Option<String>>;
}

@ -2,10 +2,9 @@ use crate::model::*;
use crate::sparql::{GraphPattern, PreparedQuery, QueryOptions, SimplePreparedQuery};
use crate::store::numeric_encoder::*;
use crate::store::{load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore};
use crate::{DatasetSyntax, Error, GraphSyntax, Result};
use crate::{DatasetSyntax, GraphSyntax, Result};
use rocksdb::*;
use std::io::{BufRead, Cursor};
use std::iter::{empty, once};
use std::io::BufRead;
use std::mem::take;
use std::path::Path;
use std::str;
@ -91,7 +90,7 @@ impl RocksDbStore {
db: Arc::new(DB::open_cf(&options, path, &COLUMN_FAMILIES)?),
};
let mut transaction = new.handle()?.auto_transaction();
let mut transaction = new.handle().auto_transaction();
transaction.set_first_strings()?;
transaction.commit()?;
@ -128,7 +127,7 @@ impl RocksDbStore {
predicate: Option<&NamedNode>,
object: Option<&Term>,
graph_name: Option<Option<&NamedOrBlankNode>>,
) -> Box<dyn Iterator<Item = Result<Quad>> + 'a>
) -> impl Iterator<Item = Result<Quad>> + 'a
where
Self: 'a,
{
@ -136,16 +135,15 @@ impl RocksDbStore {
let predicate = predicate.map(|p| p.into());
let object = object.map(|o| o.into());
let graph_name = graph_name.map(|g| g.map_or(ENCODED_DEFAULT_GRAPH, |g| g.into()));
Box::new(
self.encoded_quads_for_pattern(subject, predicate, object, graph_name)
.map(move |quad| self.decode_quad(&quad?)),
)
self.handle()
.encoded_quads_for_pattern(subject, predicate, object, graph_name)
.map(move |quad| self.decode_quad(&quad?))
}
/// Checks if this store contains a given quad
pub fn contains(&self, quad: &Quad) -> Result<bool> {
let quad = quad.into();
self.handle()?.contains(&quad)
self.handle().contains(&quad)
}
/// Executes a transaction.
@ -158,7 +156,7 @@ impl RocksDbStore {
&'a self,
f: impl FnOnce(&mut RocksDbTransaction<'a>) -> Result<()>,
) -> Result<()> {
let mut transaction = self.handle()?.transaction();
let mut transaction = self.handle().transaction();
f(&mut transaction)?;
transaction.commit()
}
@ -176,7 +174,7 @@ impl RocksDbStore {
to_graph_name: Option<&NamedOrBlankNode>,
base_iri: Option<&str>,
) -> Result<()> {
let mut transaction = self.handle()?.auto_transaction();
let mut transaction = self.handle().auto_transaction();
load_graph(&mut transaction, reader, syntax, to_graph_name, base_iri)?;
transaction.commit()
}
@ -193,14 +191,14 @@ impl RocksDbStore {
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()> {
let mut transaction = self.handle()?.auto_transaction();
let mut transaction = self.handle().auto_transaction();
load_dataset(&mut transaction, reader, syntax, base_iri)?;
transaction.commit()
}
/// Adds a quad to this store.
pub fn insert(&self, quad: &Quad) -> Result<()> {
let mut transaction = self.handle()?.auto_transaction();
let mut transaction = self.handle().auto_transaction();
let quad = transaction.encode_quad(quad)?;
transaction.insert_encoded(&quad)?;
transaction.commit()
@ -208,23 +206,23 @@ impl RocksDbStore {
/// Removes a quad from this store.
pub fn remove(&self, quad: &Quad) -> Result<()> {
let mut transaction = self.handle()?.auto_transaction();
let mut transaction = self.handle().auto_transaction();
let quad = quad.into();
transaction.remove_encoded(&quad)?;
transaction.commit()
}
fn handle<'a>(&'a self) -> Result<RocksDbStoreHandle<'a>> {
Ok(RocksDbStoreHandle {
fn handle(&self) -> RocksDbStoreHandle<'_> {
RocksDbStoreHandle {
db: &self.db,
id2str_cf: get_cf(&self.db, ID2STR_CF)?,
spog_cf: get_cf(&self.db, SPOG_CF)?,
posg_cf: get_cf(&self.db, POSG_CF)?,
ospg_cf: get_cf(&self.db, OSPG_CF)?,
gspo_cf: get_cf(&self.db, GSPO_CF)?,
gpos_cf: get_cf(&self.db, GPOS_CF)?,
gosp_cf: get_cf(&self.db, GOSP_CF)?,
})
id2str_cf: get_cf(&self.db, ID2STR_CF),
spog_cf: get_cf(&self.db, SPOG_CF),
posg_cf: get_cf(&self.db, POSG_CF),
ospg_cf: get_cf(&self.db, OSPG_CF),
gspo_cf: get_cf(&self.db, GSPO_CF),
gpos_cf: get_cf(&self.db, GPOS_CF),
gosp_cf: get_cf(&self.db, GOSP_CF),
}
}
}
@ -232,7 +230,7 @@ impl StrLookup for RocksDbStore {
fn get_str(&self, id: StrHash) -> Result<Option<String>> {
Ok(self
.db
.get_cf(get_cf(&self.db, ID2STR_CF)?, &id.to_be_bytes())?
.get_cf(get_cf(&self.db, ID2STR_CF), &id.to_be_bytes())?
.map(String::from_utf8)
.transpose()?)
}
@ -246,116 +244,108 @@ impl ReadableEncodedStore for RocksDbStore {
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a> {
let handle = match self.handle() {
Ok(handle) => handle,
Err(error) => return Box::new(once(Err(error))),
};
Box::new(
self.handle()
.encoded_quads_for_pattern(subject, predicate, object, graph_name),
)
}
}
impl<'a> RocksDbStoreHandle<'a> {
fn transaction(&self) -> RocksDbTransaction<'a> {
RocksDbTransaction {
inner: RocksDbInnerTransaction {
handle: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
},
}
}
fn auto_transaction(&self) -> RocksDbAutoTransaction<'a> {
RocksDbAutoTransaction {
inner: RocksDbInnerTransaction {
handle: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
},
}
}
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
write_spog_quad(&mut buffer, quad);
Ok(self.db.get_pinned_cf(self.spog_cf, &buffer)?.is_some())
}
fn encoded_quads_for_pattern(
&self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> DecodingIndexIterator<'a> {
match subject {
Some(subject) => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => {
let quad = EncodedQuad::new(subject, predicate, object, graph_name);
match handle.contains(&quad) {
Ok(true) => Box::new(once(Ok(quad))),
Ok(false) => Box::new(empty()),
Err(error) => Box::new(once(Err(error))),
}
}
None => Box::new(
handle.quads_for_subject_predicate_object(subject, predicate, object),
),
Some(graph_name) => self
.spog_quads(encode_term_quad(subject, predicate, object, graph_name)),
None => self.quads_for_subject_predicate_object(subject, predicate, object),
},
None => match graph_name {
Some(graph_name) => Box::new(
handle
.quads_for_subject_predicate_graph(subject, predicate, graph_name),
),
None => Box::new(handle.quads_for_subject_predicate(subject, predicate)),
Some(graph_name) => {
self.quads_for_subject_predicate_graph(subject, predicate, graph_name)
}
None => self.quads_for_subject_predicate(subject, predicate),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => Box::new(
handle.quads_for_subject_object_graph(subject, object, graph_name),
),
None => Box::new(handle.quads_for_subject_object(subject, object)),
},
None => match graph_name {
Some(graph_name) => {
Box::new(handle.quads_for_subject_graph(subject, graph_name))
self.quads_for_subject_object_graph(subject, object, graph_name)
}
None => Box::new(handle.quads_for_subject(subject)),
None => self.quads_for_subject_object(subject, object),
},
None => match graph_name {
Some(graph_name) => self.quads_for_subject_graph(subject, graph_name),
None => self.quads_for_subject(subject),
},
},
},
None => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => Box::new(
handle.quads_for_predicate_object_graph(predicate, object, graph_name),
),
None => Box::new(handle.quads_for_predicate_object(predicate, object)),
},
None => match graph_name {
Some(graph_name) => {
Box::new(handle.quads_for_predicate_graph(predicate, graph_name))
self.quads_for_predicate_object_graph(predicate, object, graph_name)
}
None => Box::new(handle.quads_for_predicate(predicate)),
None => self.quads_for_predicate_object(predicate, object),
},
None => match graph_name {
Some(graph_name) => self.quads_for_predicate_graph(predicate, graph_name),
None => self.quads_for_predicate(predicate),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => {
Box::new(handle.quads_for_object_graph(object, graph_name))
}
None => Box::new(handle.quads_for_object(object)),
Some(graph_name) => self.quads_for_object_graph(object, graph_name),
None => self.quads_for_object(object),
},
None => match graph_name {
Some(graph_name) => Box::new(handle.quads_for_graph(graph_name)),
None => Box::new(handle.quads()),
Some(graph_name) => self.quads_for_graph(graph_name),
None => self.quads(),
},
},
},
}
}
}
impl<'a> RocksDbStoreHandle<'a> {
fn transaction(&self) -> RocksDbTransaction<'a> {
RocksDbTransaction {
inner: RocksDbInnerTransaction {
handle: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
},
}
}
fn auto_transaction(&self) -> RocksDbAutoTransaction<'a> {
RocksDbAutoTransaction {
inner: RocksDbInnerTransaction {
handle: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
},
}
}
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
write_spog_quad(&mut buffer, quad);
Ok(self.db.get_pinned_cf(self.spog_cf, &buffer)?.is_some())
}
fn quads(&self) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
fn quads(&self) -> DecodingIndexIterator<'a> {
self.spog_quads(Vec::default())
}
fn quads_for_subject(
&self,
subject: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
fn quads_for_subject(&self, subject: EncodedTerm) -> DecodingIndexIterator<'a> {
self.spog_quads(encode_term(subject))
}
@ -363,7 +353,7 @@ impl<'a> RocksDbStoreHandle<'a> {
&self,
subject: EncodedTerm,
predicate: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
) -> DecodingIndexIterator<'a> {
self.spog_quads(encode_term_pair(subject, predicate))
}
@ -372,7 +362,7 @@ impl<'a> RocksDbStoreHandle<'a> {
subject: EncodedTerm,
predicate: EncodedTerm,
object: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
) -> DecodingIndexIterator<'a> {
self.spog_quads(encode_term_triple(subject, predicate, object))
}
@ -380,14 +370,11 @@ impl<'a> RocksDbStoreHandle<'a> {
&self,
subject: EncodedTerm,
object: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
) -> DecodingIndexIterator<'a> {
self.ospg_quads(encode_term_pair(object, subject))
}
fn quads_for_predicate(
&self,
predicate: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
fn quads_for_predicate(&self, predicate: EncodedTerm) -> DecodingIndexIterator<'a> {
self.posg_quads(encode_term(predicate))
}
@ -395,21 +382,15 @@ impl<'a> RocksDbStoreHandle<'a> {
&self,
predicate: EncodedTerm,
object: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
) -> DecodingIndexIterator<'a> {
self.posg_quads(encode_term_pair(predicate, object))
}
fn quads_for_object(
&self,
object: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
fn quads_for_object(&self, object: EncodedTerm) -> DecodingIndexIterator<'a> {
self.ospg_quads(encode_term(object))
}
fn quads_for_graph(
&self,
graph_name: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
fn quads_for_graph(&self, graph_name: EncodedTerm) -> DecodingIndexIterator<'a> {
self.gspo_quads(encode_term(graph_name))
}
@ -417,7 +398,7 @@ impl<'a> RocksDbStoreHandle<'a> {
&self,
subject: EncodedTerm,
graph_name: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
) -> DecodingIndexIterator<'a> {
self.gspo_quads(encode_term_pair(graph_name, subject))
}
@ -426,7 +407,7 @@ impl<'a> RocksDbStoreHandle<'a> {
subject: EncodedTerm,
predicate: EncodedTerm,
graph_name: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
) -> DecodingIndexIterator<'a> {
self.gspo_quads(encode_term_triple(graph_name, subject, predicate))
}
@ -435,7 +416,7 @@ impl<'a> RocksDbStoreHandle<'a> {
subject: EncodedTerm,
object: EncodedTerm,
graph_name: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
) -> DecodingIndexIterator<'a> {
self.gosp_quads(encode_term_triple(graph_name, object, subject))
}
@ -443,7 +424,7 @@ impl<'a> RocksDbStoreHandle<'a> {
&self,
predicate: EncodedTerm,
graph_name: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
) -> DecodingIndexIterator<'a> {
self.gpos_quads(encode_term_pair(graph_name, predicate))
}
@ -452,7 +433,7 @@ impl<'a> RocksDbStoreHandle<'a> {
predicate: EncodedTerm,
object: EncodedTerm,
graph_name: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
) -> DecodingIndexIterator<'a> {
self.gpos_quads(encode_term_triple(graph_name, predicate, object))
}
@ -460,58 +441,46 @@ impl<'a> RocksDbStoreHandle<'a> {
&self,
object: EncodedTerm,
graph_name: EncodedTerm,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
) -> DecodingIndexIterator<'a> {
self.gosp_quads(encode_term_pair(graph_name, object))
}
fn spog_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
self.inner_quads(self.spog_cf, prefix, |buffer| {
Cursor::new(buffer).read_spog_quad()
})
fn spog_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> {
self.inner_quads(self.spog_cf, prefix, QuadEncoding::SPOG)
}
fn posg_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
self.inner_quads(self.posg_cf, prefix, |buffer| {
Cursor::new(buffer).read_posg_quad()
})
fn posg_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> {
self.inner_quads(self.posg_cf, prefix, QuadEncoding::POSG)
}
fn ospg_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
self.inner_quads(self.ospg_cf, prefix, |buffer| {
Cursor::new(buffer).read_ospg_quad()
})
fn ospg_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> {
self.inner_quads(self.ospg_cf, prefix, QuadEncoding::OSPG)
}
fn gspo_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
self.inner_quads(self.gspo_cf, prefix, |buffer| {
Cursor::new(buffer).read_gspo_quad()
})
fn gspo_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> {
self.inner_quads(self.gspo_cf, prefix, QuadEncoding::GSPO)
}
fn gpos_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
self.inner_quads(self.gpos_cf, prefix, |buffer| {
Cursor::new(buffer).read_gpos_quad()
})
fn gpos_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> {
self.inner_quads(self.gpos_cf, prefix, QuadEncoding::GPOS)
}
fn gosp_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
self.inner_quads(self.gosp_cf, prefix, |buffer| {
Cursor::new(buffer).read_gosp_quad()
})
fn gosp_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> {
self.inner_quads(self.gosp_cf, prefix, QuadEncoding::GOSP)
}
fn inner_quads(
&self,
cf: &ColumnFamily,
prefix: Vec<u8>,
decode: impl Fn(&[u8]) -> Result<EncodedQuad> + 'a,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
encoding: QuadEncoding,
) -> DecodingIndexIterator<'a> {
let mut iter = self.db.raw_iterator_cf(cf);
iter.seek(&prefix);
DecodingIndexIterator {
iter,
prefix,
decode,
encoding,
}
}
}
@ -699,9 +668,10 @@ impl RocksDbInnerTransaction<'_> {
}
}
fn get_cf<'a>(db: &'a DB, name: &str) -> Result<&'a ColumnFamily> {
#[allow(clippy::option_expect_used)]
fn get_cf<'a>(db: &'a DB, name: &str) -> &'a ColumnFamily {
db.cf_handle(name)
.ok_or_else(|| Error::msg(format!("column family {} not found", name)))
.expect("A column family that should exists in RocksDB does not exists")
}
fn encode_term(t: EncodedTerm) -> Vec<u8> {
@ -725,19 +695,28 @@ fn encode_term_triple(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm) -> Vec<
vec
}
struct DecodingIndexIterator<'a, F: Fn(&[u8]) -> Result<EncodedQuad>> {
fn encode_term_quad(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm, t4: EncodedTerm) -> Vec<u8> {
let mut vec = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
write_term(&mut vec, t1);
write_term(&mut vec, t2);
write_term(&mut vec, t3);
write_term(&mut vec, t4);
vec
}
struct DecodingIndexIterator<'a> {
iter: DBRawIterator<'a>,
prefix: Vec<u8>,
decode: F,
encoding: QuadEncoding,
}
impl<'a, F: Fn(&[u8]) -> Result<EncodedQuad>> Iterator for DecodingIndexIterator<'a, F> {
impl<'a> Iterator for DecodingIndexIterator<'a> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
if let Some(key) = self.iter.key() {
if key.starts_with(&self.prefix) {
let result = (self.decode)(key);
let result = self.encoding.decode(key);
self.iter.next();
Some(result)
} else {

Loading…
Cancel
Save