diff --git a/lib/src/store/numeric_encoder.rs b/lib/src/store/numeric_encoder.rs index 6b639d3f..2cd18cad 100644 --- a/lib/src/store/numeric_encoder.rs +++ b/lib/src/store/numeric_encoder.rs @@ -12,7 +12,7 @@ use siphasher::sip128::{Hasher128, SipHasher24}; use std::collections::HashMap; use std::hash::Hash; use std::hash::Hasher; -use std::io::Read; +use std::io::{Cursor, Read}; use std::mem::size_of; use std::str; @@ -800,6 +800,30 @@ pub fn write_gosp_quad(sink: &mut Vec, quad: &EncodedQuad) { write_term(sink, quad.predicate); } +#[derive(Clone, Copy)] +pub enum QuadEncoding { + SPOG, + POSG, + OSPG, + GSPO, + GPOS, + GOSP, +} + +impl QuadEncoding { + pub fn decode(self, buffer: &[u8]) -> Result { + let mut cursor = Cursor::new(&buffer); + match self { + QuadEncoding::SPOG => cursor.read_spog_quad(), + QuadEncoding::POSG => cursor.read_posg_quad(), + QuadEncoding::OSPG => cursor.read_ospg_quad(), + QuadEncoding::GSPO => cursor.read_gspo_quad(), + QuadEncoding::GPOS => cursor.read_gpos_quad(), + QuadEncoding::GOSP => cursor.read_gosp_quad(), + } + } +} + pub trait StrLookup { fn get_str(&self, id: StrHash) -> Result>; } diff --git a/lib/src/store/rocksdb.rs b/lib/src/store/rocksdb.rs index 42b98470..1aea566d 100644 --- a/lib/src/store/rocksdb.rs +++ b/lib/src/store/rocksdb.rs @@ -2,10 +2,9 @@ use crate::model::*; use crate::sparql::{GraphPattern, PreparedQuery, QueryOptions, SimplePreparedQuery}; use crate::store::numeric_encoder::*; use crate::store::{load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore}; -use crate::{DatasetSyntax, Error, GraphSyntax, Result}; +use crate::{DatasetSyntax, GraphSyntax, Result}; use rocksdb::*; -use std::io::{BufRead, Cursor}; -use std::iter::{empty, once}; +use std::io::BufRead; use std::mem::take; use std::path::Path; use std::str; @@ -91,7 +90,7 @@ impl RocksDbStore { db: Arc::new(DB::open_cf(&options, path, &COLUMN_FAMILIES)?), }; - let mut transaction = new.handle()?.auto_transaction(); + let mut transaction = new.handle().auto_transaction(); transaction.set_first_strings()?; transaction.commit()?; @@ -128,7 +127,7 @@ impl RocksDbStore { predicate: Option<&NamedNode>, object: Option<&Term>, graph_name: Option>, - ) -> Box> + 'a> + ) -> impl Iterator> + 'a where Self: 'a, { @@ -136,16 +135,15 @@ impl RocksDbStore { let predicate = predicate.map(|p| p.into()); let object = object.map(|o| o.into()); let graph_name = graph_name.map(|g| g.map_or(ENCODED_DEFAULT_GRAPH, |g| g.into())); - Box::new( - self.encoded_quads_for_pattern(subject, predicate, object, graph_name) - .map(move |quad| self.decode_quad(&quad?)), - ) + self.handle() + .encoded_quads_for_pattern(subject, predicate, object, graph_name) + .map(move |quad| self.decode_quad(&quad?)) } /// Checks if this store contains a given quad pub fn contains(&self, quad: &Quad) -> Result { let quad = quad.into(); - self.handle()?.contains(&quad) + self.handle().contains(&quad) } /// Executes a transaction. @@ -158,7 +156,7 @@ impl RocksDbStore { &'a self, f: impl FnOnce(&mut RocksDbTransaction<'a>) -> Result<()>, ) -> Result<()> { - let mut transaction = self.handle()?.transaction(); + let mut transaction = self.handle().transaction(); f(&mut transaction)?; transaction.commit() } @@ -176,7 +174,7 @@ impl RocksDbStore { to_graph_name: Option<&NamedOrBlankNode>, base_iri: Option<&str>, ) -> Result<()> { - let mut transaction = self.handle()?.auto_transaction(); + let mut transaction = self.handle().auto_transaction(); load_graph(&mut transaction, reader, syntax, to_graph_name, base_iri)?; transaction.commit() } @@ -193,14 +191,14 @@ impl RocksDbStore { syntax: DatasetSyntax, base_iri: Option<&str>, ) -> Result<()> { - let mut transaction = self.handle()?.auto_transaction(); + let mut transaction = self.handle().auto_transaction(); load_dataset(&mut transaction, reader, syntax, base_iri)?; transaction.commit() } /// Adds a quad to this store. pub fn insert(&self, quad: &Quad) -> Result<()> { - let mut transaction = self.handle()?.auto_transaction(); + let mut transaction = self.handle().auto_transaction(); let quad = transaction.encode_quad(quad)?; transaction.insert_encoded(&quad)?; transaction.commit() @@ -208,23 +206,23 @@ impl RocksDbStore { /// Removes a quad from this store. pub fn remove(&self, quad: &Quad) -> Result<()> { - let mut transaction = self.handle()?.auto_transaction(); + let mut transaction = self.handle().auto_transaction(); let quad = quad.into(); transaction.remove_encoded(&quad)?; transaction.commit() } - fn handle<'a>(&'a self) -> Result> { - Ok(RocksDbStoreHandle { + fn handle(&self) -> RocksDbStoreHandle<'_> { + RocksDbStoreHandle { db: &self.db, - id2str_cf: get_cf(&self.db, ID2STR_CF)?, - spog_cf: get_cf(&self.db, SPOG_CF)?, - posg_cf: get_cf(&self.db, POSG_CF)?, - ospg_cf: get_cf(&self.db, OSPG_CF)?, - gspo_cf: get_cf(&self.db, GSPO_CF)?, - gpos_cf: get_cf(&self.db, GPOS_CF)?, - gosp_cf: get_cf(&self.db, GOSP_CF)?, - }) + id2str_cf: get_cf(&self.db, ID2STR_CF), + spog_cf: get_cf(&self.db, SPOG_CF), + posg_cf: get_cf(&self.db, POSG_CF), + ospg_cf: get_cf(&self.db, OSPG_CF), + gspo_cf: get_cf(&self.db, GSPO_CF), + gpos_cf: get_cf(&self.db, GPOS_CF), + gosp_cf: get_cf(&self.db, GOSP_CF), + } } } @@ -232,7 +230,7 @@ impl StrLookup for RocksDbStore { fn get_str(&self, id: StrHash) -> Result> { Ok(self .db - .get_cf(get_cf(&self.db, ID2STR_CF)?, &id.to_be_bytes())? + .get_cf(get_cf(&self.db, ID2STR_CF), &id.to_be_bytes())? .map(String::from_utf8) .transpose()?) } @@ -246,116 +244,108 @@ impl ReadableEncodedStore for RocksDbStore { object: Option, graph_name: Option, ) -> Box> + 'a> { - let handle = match self.handle() { - Ok(handle) => handle, - Err(error) => return Box::new(once(Err(error))), - }; + Box::new( + self.handle() + .encoded_quads_for_pattern(subject, predicate, object, graph_name), + ) + } +} + +impl<'a> RocksDbStoreHandle<'a> { + fn transaction(&self) -> RocksDbTransaction<'a> { + RocksDbTransaction { + inner: RocksDbInnerTransaction { + handle: self.clone(), + batch: WriteBatch::default(), + buffer: Vec::default(), + }, + } + } + + fn auto_transaction(&self) -> RocksDbAutoTransaction<'a> { + RocksDbAutoTransaction { + inner: RocksDbInnerTransaction { + handle: self.clone(), + batch: WriteBatch::default(), + buffer: Vec::default(), + }, + } + } + + fn contains(&self, quad: &EncodedQuad) -> Result { + let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); + write_spog_quad(&mut buffer, quad); + Ok(self.db.get_pinned_cf(self.spog_cf, &buffer)?.is_some()) + } + + fn encoded_quads_for_pattern( + &self, + subject: Option, + predicate: Option, + object: Option, + graph_name: Option, + ) -> DecodingIndexIterator<'a> { match subject { Some(subject) => match predicate { Some(predicate) => match object { Some(object) => match graph_name { - Some(graph_name) => { - let quad = EncodedQuad::new(subject, predicate, object, graph_name); - match handle.contains(&quad) { - Ok(true) => Box::new(once(Ok(quad))), - Ok(false) => Box::new(empty()), - Err(error) => Box::new(once(Err(error))), - } - } - None => Box::new( - handle.quads_for_subject_predicate_object(subject, predicate, object), - ), + Some(graph_name) => self + .spog_quads(encode_term_quad(subject, predicate, object, graph_name)), + None => self.quads_for_subject_predicate_object(subject, predicate, object), }, None => match graph_name { - Some(graph_name) => Box::new( - handle - .quads_for_subject_predicate_graph(subject, predicate, graph_name), - ), - None => Box::new(handle.quads_for_subject_predicate(subject, predicate)), + Some(graph_name) => { + self.quads_for_subject_predicate_graph(subject, predicate, graph_name) + } + None => self.quads_for_subject_predicate(subject, predicate), }, }, None => match object { Some(object) => match graph_name { - Some(graph_name) => Box::new( - handle.quads_for_subject_object_graph(subject, object, graph_name), - ), - None => Box::new(handle.quads_for_subject_object(subject, object)), - }, - None => match graph_name { Some(graph_name) => { - Box::new(handle.quads_for_subject_graph(subject, graph_name)) + self.quads_for_subject_object_graph(subject, object, graph_name) } - None => Box::new(handle.quads_for_subject(subject)), + None => self.quads_for_subject_object(subject, object), + }, + None => match graph_name { + Some(graph_name) => self.quads_for_subject_graph(subject, graph_name), + None => self.quads_for_subject(subject), }, }, }, None => match predicate { Some(predicate) => match object { Some(object) => match graph_name { - Some(graph_name) => Box::new( - handle.quads_for_predicate_object_graph(predicate, object, graph_name), - ), - None => Box::new(handle.quads_for_predicate_object(predicate, object)), - }, - None => match graph_name { Some(graph_name) => { - Box::new(handle.quads_for_predicate_graph(predicate, graph_name)) + self.quads_for_predicate_object_graph(predicate, object, graph_name) } - None => Box::new(handle.quads_for_predicate(predicate)), + + None => self.quads_for_predicate_object(predicate, object), + }, + None => match graph_name { + Some(graph_name) => self.quads_for_predicate_graph(predicate, graph_name), + None => self.quads_for_predicate(predicate), }, }, None => match object { Some(object) => match graph_name { - Some(graph_name) => { - Box::new(handle.quads_for_object_graph(object, graph_name)) - } - None => Box::new(handle.quads_for_object(object)), + Some(graph_name) => self.quads_for_object_graph(object, graph_name), + None => self.quads_for_object(object), }, None => match graph_name { - Some(graph_name) => Box::new(handle.quads_for_graph(graph_name)), - None => Box::new(handle.quads()), + Some(graph_name) => self.quads_for_graph(graph_name), + None => self.quads(), }, }, }, } } -} -impl<'a> RocksDbStoreHandle<'a> { - fn transaction(&self) -> RocksDbTransaction<'a> { - RocksDbTransaction { - inner: RocksDbInnerTransaction { - handle: self.clone(), - batch: WriteBatch::default(), - buffer: Vec::default(), - }, - } - } - - fn auto_transaction(&self) -> RocksDbAutoTransaction<'a> { - RocksDbAutoTransaction { - inner: RocksDbInnerTransaction { - handle: self.clone(), - batch: WriteBatch::default(), - buffer: Vec::default(), - }, - } - } - - fn contains(&self, quad: &EncodedQuad) -> Result { - let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); - write_spog_quad(&mut buffer, quad); - Ok(self.db.get_pinned_cf(self.spog_cf, &buffer)?.is_some()) - } - - fn quads(&self) -> impl Iterator> + 'a { + fn quads(&self) -> DecodingIndexIterator<'a> { self.spog_quads(Vec::default()) } - fn quads_for_subject( - &self, - subject: EncodedTerm, - ) -> impl Iterator> + 'a { + fn quads_for_subject(&self, subject: EncodedTerm) -> DecodingIndexIterator<'a> { self.spog_quads(encode_term(subject)) } @@ -363,7 +353,7 @@ impl<'a> RocksDbStoreHandle<'a> { &self, subject: EncodedTerm, predicate: EncodedTerm, - ) -> impl Iterator> + 'a { + ) -> DecodingIndexIterator<'a> { self.spog_quads(encode_term_pair(subject, predicate)) } @@ -372,7 +362,7 @@ impl<'a> RocksDbStoreHandle<'a> { subject: EncodedTerm, predicate: EncodedTerm, object: EncodedTerm, - ) -> impl Iterator> + 'a { + ) -> DecodingIndexIterator<'a> { self.spog_quads(encode_term_triple(subject, predicate, object)) } @@ -380,14 +370,11 @@ impl<'a> RocksDbStoreHandle<'a> { &self, subject: EncodedTerm, object: EncodedTerm, - ) -> impl Iterator> + 'a { + ) -> DecodingIndexIterator<'a> { self.ospg_quads(encode_term_pair(object, subject)) } - fn quads_for_predicate( - &self, - predicate: EncodedTerm, - ) -> impl Iterator> + 'a { + fn quads_for_predicate(&self, predicate: EncodedTerm) -> DecodingIndexIterator<'a> { self.posg_quads(encode_term(predicate)) } @@ -395,21 +382,15 @@ impl<'a> RocksDbStoreHandle<'a> { &self, predicate: EncodedTerm, object: EncodedTerm, - ) -> impl Iterator> + 'a { + ) -> DecodingIndexIterator<'a> { self.posg_quads(encode_term_pair(predicate, object)) } - fn quads_for_object( - &self, - object: EncodedTerm, - ) -> impl Iterator> + 'a { + fn quads_for_object(&self, object: EncodedTerm) -> DecodingIndexIterator<'a> { self.ospg_quads(encode_term(object)) } - fn quads_for_graph( - &self, - graph_name: EncodedTerm, - ) -> impl Iterator> + 'a { + fn quads_for_graph(&self, graph_name: EncodedTerm) -> DecodingIndexIterator<'a> { self.gspo_quads(encode_term(graph_name)) } @@ -417,7 +398,7 @@ impl<'a> RocksDbStoreHandle<'a> { &self, subject: EncodedTerm, graph_name: EncodedTerm, - ) -> impl Iterator> + 'a { + ) -> DecodingIndexIterator<'a> { self.gspo_quads(encode_term_pair(graph_name, subject)) } @@ -426,7 +407,7 @@ impl<'a> RocksDbStoreHandle<'a> { subject: EncodedTerm, predicate: EncodedTerm, graph_name: EncodedTerm, - ) -> impl Iterator> + 'a { + ) -> DecodingIndexIterator<'a> { self.gspo_quads(encode_term_triple(graph_name, subject, predicate)) } @@ -435,7 +416,7 @@ impl<'a> RocksDbStoreHandle<'a> { subject: EncodedTerm, object: EncodedTerm, graph_name: EncodedTerm, - ) -> impl Iterator> + 'a { + ) -> DecodingIndexIterator<'a> { self.gosp_quads(encode_term_triple(graph_name, object, subject)) } @@ -443,7 +424,7 @@ impl<'a> RocksDbStoreHandle<'a> { &self, predicate: EncodedTerm, graph_name: EncodedTerm, - ) -> impl Iterator> + 'a { + ) -> DecodingIndexIterator<'a> { self.gpos_quads(encode_term_pair(graph_name, predicate)) } @@ -452,7 +433,7 @@ impl<'a> RocksDbStoreHandle<'a> { predicate: EncodedTerm, object: EncodedTerm, graph_name: EncodedTerm, - ) -> impl Iterator> + 'a { + ) -> DecodingIndexIterator<'a> { self.gpos_quads(encode_term_triple(graph_name, predicate, object)) } @@ -460,58 +441,46 @@ impl<'a> RocksDbStoreHandle<'a> { &self, object: EncodedTerm, graph_name: EncodedTerm, - ) -> impl Iterator> + 'a { + ) -> DecodingIndexIterator<'a> { self.gosp_quads(encode_term_pair(graph_name, object)) } - fn spog_quads(&self, prefix: Vec) -> impl Iterator> + 'a { - self.inner_quads(self.spog_cf, prefix, |buffer| { - Cursor::new(buffer).read_spog_quad() - }) + fn spog_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { + self.inner_quads(self.spog_cf, prefix, QuadEncoding::SPOG) } - fn posg_quads(&self, prefix: Vec) -> impl Iterator> + 'a { - self.inner_quads(self.posg_cf, prefix, |buffer| { - Cursor::new(buffer).read_posg_quad() - }) + fn posg_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { + self.inner_quads(self.posg_cf, prefix, QuadEncoding::POSG) } - fn ospg_quads(&self, prefix: Vec) -> impl Iterator> + 'a { - self.inner_quads(self.ospg_cf, prefix, |buffer| { - Cursor::new(buffer).read_ospg_quad() - }) + fn ospg_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { + self.inner_quads(self.ospg_cf, prefix, QuadEncoding::OSPG) } - fn gspo_quads(&self, prefix: Vec) -> impl Iterator> + 'a { - self.inner_quads(self.gspo_cf, prefix, |buffer| { - Cursor::new(buffer).read_gspo_quad() - }) + fn gspo_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { + self.inner_quads(self.gspo_cf, prefix, QuadEncoding::GSPO) } - fn gpos_quads(&self, prefix: Vec) -> impl Iterator> + 'a { - self.inner_quads(self.gpos_cf, prefix, |buffer| { - Cursor::new(buffer).read_gpos_quad() - }) + fn gpos_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { + self.inner_quads(self.gpos_cf, prefix, QuadEncoding::GPOS) } - fn gosp_quads(&self, prefix: Vec) -> impl Iterator> + 'a { - self.inner_quads(self.gosp_cf, prefix, |buffer| { - Cursor::new(buffer).read_gosp_quad() - }) + fn gosp_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { + self.inner_quads(self.gosp_cf, prefix, QuadEncoding::GOSP) } fn inner_quads( &self, cf: &ColumnFamily, prefix: Vec, - decode: impl Fn(&[u8]) -> Result + 'a, - ) -> impl Iterator> + 'a { + encoding: QuadEncoding, + ) -> DecodingIndexIterator<'a> { let mut iter = self.db.raw_iterator_cf(cf); iter.seek(&prefix); DecodingIndexIterator { iter, prefix, - decode, + encoding, } } } @@ -699,9 +668,10 @@ impl RocksDbInnerTransaction<'_> { } } -fn get_cf<'a>(db: &'a DB, name: &str) -> Result<&'a ColumnFamily> { +#[allow(clippy::option_expect_used)] +fn get_cf<'a>(db: &'a DB, name: &str) -> &'a ColumnFamily { db.cf_handle(name) - .ok_or_else(|| Error::msg(format!("column family {} not found", name))) + .expect("A column family that should exists in RocksDB does not exists") } fn encode_term(t: EncodedTerm) -> Vec { @@ -725,19 +695,28 @@ fn encode_term_triple(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm) -> Vec< vec } -struct DecodingIndexIterator<'a, F: Fn(&[u8]) -> Result> { +fn encode_term_quad(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm, t4: EncodedTerm) -> Vec { + let mut vec = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); + write_term(&mut vec, t1); + write_term(&mut vec, t2); + write_term(&mut vec, t3); + write_term(&mut vec, t4); + vec +} + +struct DecodingIndexIterator<'a> { iter: DBRawIterator<'a>, prefix: Vec, - decode: F, + encoding: QuadEncoding, } -impl<'a, F: Fn(&[u8]) -> Result> Iterator for DecodingIndexIterator<'a, F> { +impl<'a> Iterator for DecodingIndexIterator<'a> { type Item = Result; fn next(&mut self) -> Option> { if let Some(key) = self.iter.key() { if key.starts_with(&self.prefix) { - let result = (self.decode)(key); + let result = self.encoding.decode(key); self.iter.next(); Some(result) } else {