diff --git a/lib/src/store/binary_encoder.rs b/lib/src/store/binary_encoder.rs index 98933b28..d989c39a 100644 --- a/lib/src/store/binary_encoder.rs +++ b/lib/src/store/binary_encoder.rs @@ -1,14 +1,11 @@ use crate::error::invalid_data_error; use crate::model::xsd::*; -use crate::store::numeric_encoder::StrHash; +use crate::store::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash}; use crate::store::small_string::SmallString; use std::io; use std::io::{Cursor, Read}; use std::mem::size_of; -type EncodedTerm = crate::store::numeric_encoder::EncodedTerm; -type EncodedQuad = crate::store::numeric_encoder::EncodedQuad; - pub const LATEST_STORAGE_VERSION: u64 = 1; pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::() + 2 * size_of::(); diff --git a/lib/src/store/mod.rs b/lib/src/store/mod.rs index 6c879da2..e9a61ea4 100644 --- a/lib/src/store/mod.rs +++ b/lib/src/store/mod.rs @@ -6,6 +6,7 @@ pub mod sled; pub(crate) mod small_string; #[cfg(feature = "sophia")] mod sophia; +pub(crate) mod storage; pub use crate::store::sled::SledStore; diff --git a/lib/src/store/sled.rs b/lib/src/store/sled.rs index 7ce046bb..b29a0964 100644 --- a/lib/src/store/sled.rs +++ b/lib/src/store/sled.rs @@ -1,27 +1,24 @@ //! Store based on the [Sled](https://sled.rs/) key-value database. -use crate::error::invalid_data_error; use crate::io::{DatasetFormat, GraphFormat}; use crate::model::*; use crate::sparql::{ evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update, UpdateOptions, }; -use crate::store::binary_encoder::*; use crate::store::numeric_encoder::{ - Decoder, ReadEncoder, StrContainer, StrEncodingAware, StrHash, StrLookup, WriteEncoder, + Decoder, EncodedQuad, EncodedTerm, ReadEncoder, StrContainer, StrEncodingAware, StrHash, + StrLookup, WriteEncoder, +}; +use crate::store::storage::*; +pub use crate::store::storage::{ + SledConflictableTransactionError, SledTransactionError, SledUnabortableTransactionError, }; use crate::store::{ dump_dataset, dump_graph, get_encoded_quad_pattern, load_dataset, load_graph, - ReadableEncodedStore, StoreOrParseError, WritableEncodedStore, -}; -use sled::transaction::{ - ConflictableTransactionError, TransactionError, Transactional, TransactionalTree, - UnabortableTransactionError, + ReadableEncodedStore, WritableEncodedStore, }; -use sled::{Config, Db, Iter, Tree}; use std::convert::TryInto; -use std::error::Error; use std::io::{BufRead, Write}; use std::iter::{once, Once}; use std::path::Path; @@ -62,97 +59,26 @@ use std::{fmt, io, str}; /// ``` #[derive(Clone)] pub struct SledStore { - default: Db, - id2str: Tree, - spog: Tree, - posg: Tree, - ospg: Tree, - gspo: Tree, - gpos: Tree, - gosp: Tree, - dspo: Tree, - dpos: Tree, - dosp: Tree, - graphs: Tree, + storage: Storage, } -type EncodedTerm = crate::store::numeric_encoder::EncodedTerm; -type EncodedQuad = crate::store::numeric_encoder::EncodedQuad; - //TODO: indexes for the default graph and indexes for the named graphs (no more Optional and space saving) impl SledStore { /// Creates a temporary [`SledStore`]() that will be deleted after drop. pub fn new() -> Result { - Self::do_open(&Config::new().temporary(true)) + Ok(Self { + storage: Storage::new()?, + }) } /// Opens a [`SledStore`]() and creates it if it does not exist yet. pub fn open(path: impl AsRef) -> Result { - Self::do_open(&Config::new().path(path)) - } - - fn do_open(config: &Config) -> Result { - let db = config.open()?; - let this = Self { - default: db.clone(), - id2str: db.open_tree("id2str")?, - spog: db.open_tree("spog")?, - posg: db.open_tree("posg")?, - ospg: db.open_tree("ospg")?, - gspo: db.open_tree("gspo")?, - gpos: db.open_tree("gpos")?, - gosp: db.open_tree("gosp")?, - dspo: db.open_tree("dspo")?, - dpos: db.open_tree("dpos")?, - dosp: db.open_tree("dosp")?, - graphs: db.open_tree("graphs")?, - }; - - let mut version = this.ensure_version()?; - if version == 0 { - // We migrate to v1 - for quad in this.encoded_quads_for_pattern(None, None, None, None) { - let mut this_mut = &this; - let quad = quad?; - if !quad.graph_name.is_default_graph() { - this_mut.insert_encoded_named_graph(quad.graph_name)?; - } - } - version = 1; - this.set_version(version)?; - this.graphs.flush()?; - } - - match version { - _ if version < LATEST_STORAGE_VERSION => Err(invalid_data_error(format!( - "The Sled database is using the outdated encoding version {}. Automated migration is not supported, please dump the store dataset using a compatible Oxigraph version and load it again using the current version", - version - ))), - LATEST_STORAGE_VERSION => Ok(this), - _ => Err(invalid_data_error(format!( - "The Sled database is using the too recent version {}. Upgrade to the latest Oxigraph version to load this database", - version - ))) - } - } - - fn ensure_version(&self) -> Result { - Ok(if let Some(version) = self.default.get("oxversion")? { - let mut buffer = [0; 8]; - buffer.copy_from_slice(&version); - u64::from_be_bytes(buffer) - } else { - self.set_version(LATEST_STORAGE_VERSION)?; - LATEST_STORAGE_VERSION + Ok(Self { + storage: Storage::open(path.as_ref())?, }) } - fn set_version(&self, version: u64) -> Result<(), io::Error> { - self.default.insert("oxversion", &version.to_be_bytes())?; - Ok(()) - } - /// Executes a [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/). /// /// Usage example: @@ -235,7 +161,7 @@ impl SledStore { /// Checks if this store contains a given quad pub fn contains<'a>(&self, quad: impl Into>) -> Result { if let Some(quad) = self.get_encoded_quad(quad.into())? { - self.contains_encoded(&quad) + self.storage.contains(&quad) } else { Ok(false) } @@ -245,12 +171,12 @@ impl SledStore { /// /// Warning: this function executes a full scan pub fn len(&self) -> usize { - self.gspo.len() + self.dspo.len() + self.storage.len() } /// Returns if the store is empty pub fn is_empty(&self) -> bool { - self.gspo.is_empty() && self.dspo.is_empty() + self.storage.is_empty() } /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/). @@ -325,36 +251,8 @@ impl SledStore { &self, f: impl Fn(SledTransaction<'_>) -> Result>, ) -> Result> { - Ok(( - &self.id2str, - &self.spog, - &self.posg, - &self.ospg, - &self.gspo, - &self.gpos, - &self.gosp, - &self.dspo, - &self.dpos, - &self.dosp, - &self.graphs, - ) - .transaction( - move |(id2str, spog, posg, ospg, gspo, gpos, gosp, dspo, dpos, dosp, graphs)| { - Ok(f(SledTransaction { - id2str, - spog, - posg, - ospg, - gspo, - gpos, - gosp, - dspo, - dpos, - dosp, - graphs, - })?) - }, - )?) + self.storage + .transaction(|storage| f(SledTransaction { storage })) } /// Loads a graph file (i.e. triples) into the store @@ -661,234 +559,6 @@ impl SledStore { let mut this = self; (&mut this).clear() } - - fn contains_encoded(&self, quad: &EncodedQuad) -> Result { - let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); - if quad.graph_name.is_default_graph() { - write_spo_quad(&mut buffer, quad); - Ok(self.dspo.contains_key(buffer)?) - } else { - write_gspo_quad(&mut buffer, quad); - Ok(self.gspo.contains_key(buffer)?) - } - } - fn quads(&self) -> DecodingQuadsIterator { - DecodingQuadsIterator::pair( - self.dspo_quads(Vec::default()), - self.gspo_quads(Vec::default()), - ) - } - - fn quads_for_subject(&self, subject: EncodedTerm) -> DecodingQuadsIterator { - DecodingQuadsIterator::pair( - self.dspo_quads(encode_term(subject)), - self.spog_quads(encode_term(subject)), - ) - } - - fn quads_for_subject_predicate( - &self, - subject: EncodedTerm, - predicate: EncodedTerm, - ) -> DecodingQuadsIterator { - DecodingQuadsIterator::pair( - self.dspo_quads(encode_term_pair(subject, predicate)), - self.spog_quads(encode_term_pair(subject, predicate)), - ) - } - - fn quads_for_subject_predicate_object( - &self, - subject: EncodedTerm, - predicate: EncodedTerm, - object: EncodedTerm, - ) -> DecodingQuadsIterator { - DecodingQuadsIterator::pair( - self.dspo_quads(encode_term_triple(subject, predicate, object)), - self.spog_quads(encode_term_triple(subject, predicate, object)), - ) - } - - fn quads_for_subject_object( - &self, - subject: EncodedTerm, - object: EncodedTerm, - ) -> DecodingQuadsIterator { - DecodingQuadsIterator::pair( - self.dosp_quads(encode_term_pair(object, subject)), - self.ospg_quads(encode_term_pair(object, subject)), - ) - } - - fn quads_for_predicate(&self, predicate: EncodedTerm) -> DecodingQuadsIterator { - DecodingQuadsIterator::pair( - self.dpos_quads(encode_term(predicate)), - self.posg_quads(encode_term(predicate)), - ) - } - - fn quads_for_predicate_object( - &self, - predicate: EncodedTerm, - object: EncodedTerm, - ) -> DecodingQuadsIterator { - DecodingQuadsIterator::pair( - self.dpos_quads(encode_term_pair(predicate, object)), - self.posg_quads(encode_term_pair(predicate, object)), - ) - } - - fn quads_for_object(&self, object: EncodedTerm) -> DecodingQuadsIterator { - DecodingQuadsIterator::pair( - self.dosp_quads(encode_term(object)), - self.ospg_quads(encode_term(object)), - ) - } - - fn quads_for_graph(&self, graph_name: EncodedTerm) -> DecodingQuadsIterator { - DecodingQuadsIterator::new(if graph_name.is_default_graph() { - self.dspo_quads(Vec::default()) - } else { - self.gspo_quads(encode_term(graph_name)) - }) - } - - fn quads_for_subject_graph( - &self, - subject: EncodedTerm, - graph_name: EncodedTerm, - ) -> DecodingQuadsIterator { - DecodingQuadsIterator::new(if graph_name.is_default_graph() { - self.dspo_quads(encode_term(subject)) - } else { - self.gspo_quads(encode_term_pair(graph_name, subject)) - }) - } - - fn quads_for_subject_predicate_graph( - &self, - subject: EncodedTerm, - predicate: EncodedTerm, - graph_name: EncodedTerm, - ) -> DecodingQuadsIterator { - DecodingQuadsIterator::new(if graph_name.is_default_graph() { - self.dspo_quads(encode_term_pair(subject, predicate)) - } else { - self.gspo_quads(encode_term_triple(graph_name, subject, predicate)) - }) - } - - fn quads_for_subject_predicate_object_graph( - &self, - subject: EncodedTerm, - predicate: EncodedTerm, - object: EncodedTerm, - graph_name: EncodedTerm, - ) -> DecodingQuadsIterator { - DecodingQuadsIterator::new(if graph_name.is_default_graph() { - self.dspo_quads(encode_term_triple(subject, predicate, object)) - } else { - self.gspo_quads(encode_term_quad(graph_name, subject, predicate, object)) - }) - } - - fn quads_for_subject_object_graph( - &self, - subject: EncodedTerm, - object: EncodedTerm, - graph_name: EncodedTerm, - ) -> DecodingQuadsIterator { - DecodingQuadsIterator::new(if graph_name.is_default_graph() { - self.dosp_quads(encode_term_pair(object, subject)) - } else { - self.gosp_quads(encode_term_triple(graph_name, object, subject)) - }) - } - - fn quads_for_predicate_graph( - &self, - predicate: EncodedTerm, - graph_name: EncodedTerm, - ) -> DecodingQuadsIterator { - DecodingQuadsIterator::new(if graph_name.is_default_graph() { - self.dpos_quads(encode_term(predicate)) - } else { - self.gpos_quads(encode_term_pair(graph_name, predicate)) - }) - } - - fn quads_for_predicate_object_graph( - &self, - predicate: EncodedTerm, - object: EncodedTerm, - graph_name: EncodedTerm, - ) -> DecodingQuadsIterator { - DecodingQuadsIterator::new(if graph_name.is_default_graph() { - self.dpos_quads(encode_term_pair(predicate, object)) - } else { - self.gpos_quads(encode_term_triple(graph_name, predicate, object)) - }) - } - - fn quads_for_object_graph( - &self, - object: EncodedTerm, - graph_name: EncodedTerm, - ) -> DecodingQuadsIterator { - DecodingQuadsIterator::new(if graph_name.is_default_graph() { - self.dosp_quads(encode_term(object)) - } else { - self.gosp_quads(encode_term_pair(graph_name, object)) - }) - } - - fn spog_quads(&self, prefix: Vec) -> DecodingQuadIterator { - self.inner_quads(&self.spog, prefix, QuadEncoding::Spog) - } - - fn posg_quads(&self, prefix: Vec) -> DecodingQuadIterator { - self.inner_quads(&self.posg, prefix, QuadEncoding::Posg) - } - - fn ospg_quads(&self, prefix: Vec) -> DecodingQuadIterator { - self.inner_quads(&self.ospg, prefix, QuadEncoding::Ospg) - } - - fn gspo_quads(&self, prefix: Vec) -> DecodingQuadIterator { - self.inner_quads(&self.gspo, prefix, QuadEncoding::Gspo) - } - - fn gpos_quads(&self, prefix: Vec) -> DecodingQuadIterator { - self.inner_quads(&self.gpos, prefix, QuadEncoding::Gpos) - } - - fn gosp_quads(&self, prefix: Vec) -> DecodingQuadIterator { - self.inner_quads(&self.gosp, prefix, QuadEncoding::Gosp) - } - - fn dspo_quads(&self, prefix: Vec) -> DecodingQuadIterator { - self.inner_quads(&self.dspo, prefix, QuadEncoding::Dspo) - } - - fn dpos_quads(&self, prefix: Vec) -> DecodingQuadIterator { - self.inner_quads(&self.dpos, prefix, QuadEncoding::Dpos) - } - - fn dosp_quads(&self, prefix: Vec) -> DecodingQuadIterator { - self.inner_quads(&self.dosp, prefix, QuadEncoding::Dosp) - } - - fn inner_quads( - &self, - tree: &Tree, - prefix: impl AsRef<[u8]>, - encoding: QuadEncoding, - ) -> DecodingQuadIterator { - DecodingQuadIterator { - iter: tree.scan_prefix(prefix), - encoding, - } - } } impl fmt::Display for SledStore { @@ -906,17 +576,13 @@ impl StrEncodingAware for SledStore { impl StrLookup for SledStore { fn get_str(&self, id: StrHash) -> Result, io::Error> { - self.id2str - .get(id.to_be_bytes())? - .map(|v| String::from_utf8(v.to_vec())) - .transpose() - .map_err(invalid_data_error) + self.storage.get_str(id) } fn get_str_id(&self, value: &str) -> Result, io::Error> { - let id = StrHash::new(value); - Ok(if self.id2str.contains_key(&id.to_be_bytes())? { - Some(id) + let key = StrHash::new(value); + Ok(if self.storage.contains_str(key)? { + Some(key) } else { None }) @@ -924,7 +590,7 @@ impl StrLookup for SledStore { } impl ReadableEncodedStore for SledStore { - type QuadsIter = DecodingQuadsIterator; + type QuadsIter = ChainedDecodingQuadIterator; type GraphsIter = DecodingGraphIterator; fn encoded_quads_for_pattern( @@ -933,230 +599,57 @@ impl ReadableEncodedStore for SledStore { predicate: Option, object: Option, graph_name: Option, - ) -> DecodingQuadsIterator { - match subject { - Some(subject) => match predicate { - Some(predicate) => match object { - Some(object) => match graph_name { - Some(graph_name) => self.quads_for_subject_predicate_object_graph( - subject, predicate, object, graph_name, - ), - None => self.quads_for_subject_predicate_object(subject, predicate, object), - }, - None => match graph_name { - Some(graph_name) => { - self.quads_for_subject_predicate_graph(subject, predicate, graph_name) - } - None => self.quads_for_subject_predicate(subject, predicate), - }, - }, - None => match object { - Some(object) => match graph_name { - Some(graph_name) => { - self.quads_for_subject_object_graph(subject, object, graph_name) - } - None => self.quads_for_subject_object(subject, object), - }, - None => match graph_name { - Some(graph_name) => self.quads_for_subject_graph(subject, graph_name), - None => self.quads_for_subject(subject), - }, - }, - }, - None => match predicate { - Some(predicate) => match object { - Some(object) => match graph_name { - Some(graph_name) => { - self.quads_for_predicate_object_graph(predicate, object, graph_name) - } - None => self.quads_for_predicate_object(predicate, object), - }, - None => match graph_name { - Some(graph_name) => self.quads_for_predicate_graph(predicate, graph_name), - None => self.quads_for_predicate(predicate), - }, - }, - None => match object { - Some(object) => match graph_name { - Some(graph_name) => self.quads_for_object_graph(object, graph_name), - None => self.quads_for_object(object), - }, - None => match graph_name { - Some(graph_name) => self.quads_for_graph(graph_name), - None => self.quads(), - }, - }, - }, - } + ) -> ChainedDecodingQuadIterator { + self.storage + .quads_for_pattern(subject, predicate, object, graph_name) } fn encoded_named_graphs(&self) -> DecodingGraphIterator { - DecodingGraphIterator { - iter: self.graphs.iter(), - } + self.storage.named_graphs() } fn contains_encoded_named_graph(&self, graph_name: EncodedTerm) -> Result { - Ok(self.graphs.contains_key(&encode_term(graph_name))?) + self.storage.contains_named_graph(graph_name) } } impl<'a> StrContainer for &'a SledStore { fn insert_str(&mut self, value: &str) -> Result { let key = StrHash::new(value); - self.id2str.insert(key.to_be_bytes().as_ref(), value)?; + self.storage.insert_str(key, value)?; Ok(key) } } impl<'a> WritableEncodedStore for &'a SledStore { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> { - let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); - - if quad.graph_name.is_default_graph() { - write_spo_quad(&mut buffer, quad); - self.dspo.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_pos_quad(&mut buffer, quad); - self.dpos.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_osp_quad(&mut buffer, quad); - self.dosp.insert(buffer.as_slice(), &[])?; - buffer.clear(); - } else { - write_spog_quad(&mut buffer, quad); - self.spog.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_posg_quad(&mut buffer, quad); - self.posg.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_ospg_quad(&mut buffer, quad); - self.ospg.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_gspo_quad(&mut buffer, quad); - self.gspo.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_gpos_quad(&mut buffer, quad); - self.gpos.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_gosp_quad(&mut buffer, quad); - self.gosp.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_term(&mut buffer, quad.graph_name); - self.graphs.insert(&buffer, &[])?; - buffer.clear(); - } - - Ok(()) + self.storage.insert(quad) } fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> { - let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); - - if quad.graph_name.is_default_graph() { - write_spo_quad(&mut buffer, quad); - self.dspo.remove(buffer.as_slice())?; - buffer.clear(); - - write_pos_quad(&mut buffer, quad); - self.dpos.remove(buffer.as_slice())?; - buffer.clear(); - - write_osp_quad(&mut buffer, quad); - self.dosp.remove(buffer.as_slice())?; - buffer.clear(); - } else { - write_spog_quad(&mut buffer, quad); - self.spog.remove(buffer.as_slice())?; - buffer.clear(); - - write_posg_quad(&mut buffer, quad); - self.posg.remove(buffer.as_slice())?; - buffer.clear(); - - write_ospg_quad(&mut buffer, quad); - self.ospg.remove(buffer.as_slice())?; - buffer.clear(); - - write_gspo_quad(&mut buffer, quad); - self.gspo.remove(buffer.as_slice())?; - buffer.clear(); - - write_gpos_quad(&mut buffer, quad); - self.gpos.remove(buffer.as_slice())?; - buffer.clear(); - - write_gosp_quad(&mut buffer, quad); - self.gosp.remove(buffer.as_slice())?; - buffer.clear(); - } - - Ok(()) + self.storage.remove(quad) } fn insert_encoded_named_graph(&mut self, graph_name: EncodedTerm) -> Result<(), io::Error> { - self.graphs.insert(&encode_term(graph_name), &[])?; - Ok(()) + self.storage.insert_named_graph(graph_name) } fn clear_encoded_graph(&mut self, graph_name: EncodedTerm) -> Result<(), io::Error> { - if graph_name.is_default_graph() { - self.dspo.clear()?; - self.dpos.clear()?; - self.dosp.clear()?; - } else { - for quad in self.quads_for_graph(graph_name) { - self.remove_encoded(&quad?)?; - } - } - Ok(()) + self.storage.clear_graph(graph_name) } fn remove_encoded_named_graph(&mut self, graph_name: EncodedTerm) -> Result<(), io::Error> { - for quad in self.quads_for_graph(graph_name) { - self.remove_encoded(&quad?)?; - } - self.graphs.remove(&encode_term(graph_name))?; - Ok(()) + self.storage.remove_named_graph(graph_name) } fn clear(&mut self) -> Result<(), io::Error> { - self.dspo.clear()?; - self.dpos.clear()?; - self.dosp.clear()?; - self.gspo.clear()?; - self.gpos.clear()?; - self.gosp.clear()?; - self.spog.clear()?; - self.posg.clear()?; - self.ospg.clear()?; - self.graphs.clear()?; - self.id2str.clear()?; - Ok(()) + self.storage.clear() } } /// Allows inserting and deleting quads during an ACID transaction with the [`SledStore`]. pub struct SledTransaction<'a> { - id2str: &'a TransactionalTree, - spog: &'a TransactionalTree, - posg: &'a TransactionalTree, - ospg: &'a TransactionalTree, - gspo: &'a TransactionalTree, - gpos: &'a TransactionalTree, - gosp: &'a TransactionalTree, - dspo: &'a TransactionalTree, - dpos: &'a TransactionalTree, - dosp: &'a TransactionalTree, - graphs: &'a TransactionalTree, + storage: StorageTransaction<'a>, } impl SledTransaction<'_> { @@ -1283,17 +776,13 @@ impl<'a> StrEncodingAware for &'a SledTransaction<'a> { impl<'a> StrLookup for &'a SledTransaction<'a> { fn get_str(&self, id: StrHash) -> Result, SledUnabortableTransactionError> { - self.id2str - .get(id.to_be_bytes())? - .map(|v| String::from_utf8(v.to_vec())) - .transpose() - .map_err(|e| SledUnabortableTransactionError::Storage(invalid_data_error(e))) + self.storage.get_str(id) } fn get_str_id(&self, value: &str) -> Result, SledUnabortableTransactionError> { - let id = StrHash::new(value); - Ok(if self.id2str.get(&id.to_be_bytes())?.is_some() { - Some(id) + let key = StrHash::new(value); + Ok(if self.storage.contains_str(key)? { + Some(key) } else { None }) @@ -1303,7 +792,7 @@ impl<'a> StrLookup for &'a SledTransaction<'a> { impl<'a> StrContainer for &'a SledTransaction<'a> { fn insert_str(&mut self, value: &str) -> Result { let key = StrHash::new(value); - self.id2str.insert(key.to_be_bytes().as_ref(), value)?; + self.storage.insert_str(key, value)?; Ok(key) } } @@ -1313,106 +802,21 @@ impl<'a> WritableEncodedStore for &'a SledTransaction<'a> { &mut self, quad: &EncodedQuad, ) -> Result<(), SledUnabortableTransactionError> { - let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); - - if quad.graph_name.is_default_graph() { - write_spo_quad(&mut buffer, quad); - self.dspo.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_pos_quad(&mut buffer, quad); - self.dpos.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_osp_quad(&mut buffer, quad); - self.dosp.insert(buffer.as_slice(), &[])?; - buffer.clear(); - } else { - write_spog_quad(&mut buffer, quad); - self.spog.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_posg_quad(&mut buffer, quad); - self.posg.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_ospg_quad(&mut buffer, quad); - self.ospg.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_gspo_quad(&mut buffer, quad); - self.gspo.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_gpos_quad(&mut buffer, quad); - self.gpos.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_gosp_quad(&mut buffer, quad); - self.gosp.insert(buffer.as_slice(), &[])?; - buffer.clear(); - - write_term(&mut buffer, quad.graph_name); - self.graphs.insert(buffer.as_slice(), &[])?; - buffer.clear(); - } - - Ok(()) + self.storage.insert(quad) } fn remove_encoded( &mut self, quad: &EncodedQuad, ) -> Result<(), SledUnabortableTransactionError> { - let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); - - if quad.graph_name.is_default_graph() { - write_spo_quad(&mut buffer, quad); - self.dspo.remove(buffer.as_slice())?; - buffer.clear(); - - write_pos_quad(&mut buffer, quad); - self.dpos.remove(buffer.as_slice())?; - buffer.clear(); - - write_osp_quad(&mut buffer, quad); - self.dosp.remove(buffer.as_slice())?; - buffer.clear(); - } else { - write_spog_quad(&mut buffer, quad); - self.spog.remove(buffer.as_slice())?; - buffer.clear(); - - write_posg_quad(&mut buffer, quad); - self.posg.remove(buffer.as_slice())?; - buffer.clear(); - - write_ospg_quad(&mut buffer, quad); - self.ospg.remove(buffer.as_slice())?; - buffer.clear(); - - write_gspo_quad(&mut buffer, quad); - self.gspo.remove(buffer.as_slice())?; - buffer.clear(); - - write_gpos_quad(&mut buffer, quad); - self.gpos.remove(buffer.as_slice())?; - buffer.clear(); - - write_gosp_quad(&mut buffer, quad); - self.gosp.remove(buffer.as_slice())?; - buffer.clear(); - } - - Ok(()) + self.storage.remove(quad) } fn insert_encoded_named_graph( &mut self, graph_name: EncodedTerm, ) -> Result<(), SledUnabortableTransactionError> { - self.graphs.insert(encode_term(graph_name), &[])?; - Ok(()) + self.storage.insert_named_graph(graph_name) } fn clear_encoded_graph( @@ -1443,224 +847,6 @@ impl<'a> WritableEncodedStore for &'a SledTransaction<'a> { } } -/// Error returned by a Sled transaction -#[derive(Debug)] -pub enum SledTransactionError { - /// A failure returned by the API user that have aborted the transaction - Abort(T), - /// A storage related error - Storage(io::Error), -} - -impl fmt::Display for SledTransactionError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Abort(e) => e.fmt(f), - Self::Storage(e) => e.fmt(f), - } - } -} - -impl Error for SledTransactionError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - match self { - Self::Abort(e) => Some(e), - Self::Storage(e) => Some(e), - } - } -} - -impl From> for SledTransactionError { - fn from(e: TransactionError) -> Self { - match e { - TransactionError::Abort(e) => Self::Abort(e), - TransactionError::Storage(e) => Self::Storage(e.into()), - } - } -} - -impl> From> for io::Error { - fn from(e: SledTransactionError) -> Self { - match e { - SledTransactionError::Abort(e) => e.into(), - SledTransactionError::Storage(e) => e, - } - } -} - -/// An error returned from the transaction methods. -/// Should be returned as it is -#[derive(Debug)] -pub enum SledUnabortableTransactionError { - #[doc(hidden)] - Conflict, - /// A regular error - Storage(io::Error), -} - -impl fmt::Display for SledUnabortableTransactionError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Conflict => write!(f, "Transaction conflict"), - Self::Storage(e) => e.fmt(f), - } - } -} - -impl Error for SledUnabortableTransactionError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - match self { - Self::Storage(e) => Some(e), - _ => None, - } - } -} - -impl From for EvaluationError { - fn from(e: SledUnabortableTransactionError) -> Self { - match e { - SledUnabortableTransactionError::Storage(e) => Self::Io(e), - SledUnabortableTransactionError::Conflict => Self::Conflict, - } - } -} - -impl From> for SledUnabortableTransactionError { - fn from(e: StoreOrParseError) -> Self { - match e { - StoreOrParseError::Store(e) => e, - StoreOrParseError::Parse(e) => Self::Storage(e), - } - } -} - -impl From for SledUnabortableTransactionError { - fn from(e: UnabortableTransactionError) -> Self { - match e { - UnabortableTransactionError::Storage(e) => Self::Storage(e.into()), - UnabortableTransactionError::Conflict => Self::Conflict, - } - } -} - -/// An error returned from the transaction closure -#[derive(Debug)] -pub enum SledConflictableTransactionError { - /// A failure returned by the user that will abort the transaction - Abort(T), - #[doc(hidden)] - Conflict, - /// A storage related error - Storage(io::Error), -} - -impl fmt::Display for SledConflictableTransactionError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Conflict => write!(f, "Transaction conflict"), - Self::Storage(e) => e.fmt(f), - Self::Abort(e) => e.fmt(f), - } - } -} - -impl Error for SledConflictableTransactionError { - fn source(&self) -> Option<&(dyn Error + 'static)> { - match self { - Self::Abort(e) => Some(e), - Self::Storage(e) => Some(e), - _ => None, - } - } -} - -impl From for SledConflictableTransactionError { - fn from(e: SledUnabortableTransactionError) -> Self { - match e { - SledUnabortableTransactionError::Storage(e) => Self::Storage(e), - SledUnabortableTransactionError::Conflict => Self::Conflict, - } - } -} - -impl From> for ConflictableTransactionError { - fn from(e: SledConflictableTransactionError) -> Self { - match e { - SledConflictableTransactionError::Abort(e) => ConflictableTransactionError::Abort(e), - SledConflictableTransactionError::Conflict => ConflictableTransactionError::Conflict, - SledConflictableTransactionError::Storage(e) => { - ConflictableTransactionError::Storage(e.into()) - } - } - } -} - -pub(crate) struct DecodingQuadsIterator { - first: DecodingQuadIterator, - second: Option, -} - -impl DecodingQuadsIterator { - fn new(first: DecodingQuadIterator) -> Self { - Self { - first, - second: None, - } - } - - fn pair(first: DecodingQuadIterator, second: DecodingQuadIterator) -> Self { - Self { - first, - second: Some(second), - } - } -} - -impl Iterator for DecodingQuadsIterator { - type Item = Result; - - fn next(&mut self) -> Option> { - if let Some(result) = self.first.next() { - Some(result) - } else if let Some(second) = self.second.as_mut() { - second.next() - } else { - None - } - } -} - -pub(crate) struct DecodingQuadIterator { - iter: Iter, - encoding: QuadEncoding, -} - -impl Iterator for DecodingQuadIterator { - type Item = Result; - - fn next(&mut self) -> Option> { - Some(match self.iter.next()? { - Ok((encoded, _)) => self.encoding.decode(&encoded), - Err(error) => Err(error.into()), - }) - } -} - -pub(crate) struct DecodingGraphIterator { - iter: Iter, -} - -impl Iterator for DecodingGraphIterator { - type Item = Result; - - fn next(&mut self) -> Option> { - Some(match self.iter.next()? { - Ok((encoded, _)) => decode_term(&encoded), - Err(error) => Err(error.into()), - }) - } -} - /// An iterator returning the quads contained in a [`SledStore`]. pub struct SledQuadIter { inner: QuadIterInner, @@ -1668,7 +854,7 @@ pub struct SledQuadIter { enum QuadIterInner { Quads { - iter: DecodingQuadsIterator, + iter: ChainedDecodingQuadIterator, store: SledStore, }, Error(Once), diff --git a/lib/src/store/storage.rs b/lib/src/store/storage.rs new file mode 100644 index 00000000..b0343c2f --- /dev/null +++ b/lib/src/store/storage.rs @@ -0,0 +1,951 @@ +use crate::error::invalid_data_error; +use crate::sparql::EvaluationError; +use crate::store::binary_encoder::*; +use crate::store::numeric_encoder::*; +use crate::store::StoreOrParseError; +use sled::transaction::{ + ConflictableTransactionError, TransactionError, TransactionalTree, UnabortableTransactionError, +}; +use sled::{Config, Db, Iter, Transactional, Tree}; +use std::error::Error; +use std::fmt; +use std::io; +use std::path::Path; + +/// Low level storage primitives +#[derive(Clone)] +pub struct Storage { + default: Db, + id2str: Tree, + spog: Tree, + posg: Tree, + ospg: Tree, + gspo: Tree, + gpos: Tree, + gosp: Tree, + dspo: Tree, + dpos: Tree, + dosp: Tree, + graphs: Tree, +} + +impl Storage { + pub fn new() -> Result { + Self::do_open(&Config::new().temporary(true)) + } + + pub fn open(path: &Path) -> Result { + Self::do_open(&Config::new().path(path)) + } + + fn do_open(config: &Config) -> Result { + let db = config.open()?; + let this = Self { + default: db.clone(), + id2str: db.open_tree("id2str")?, + spog: db.open_tree("spog")?, + posg: db.open_tree("posg")?, + ospg: db.open_tree("ospg")?, + gspo: db.open_tree("gspo")?, + gpos: db.open_tree("gpos")?, + gosp: db.open_tree("gosp")?, + dspo: db.open_tree("dspo")?, + dpos: db.open_tree("dpos")?, + dosp: db.open_tree("dosp")?, + graphs: db.open_tree("graphs")?, + }; + + let mut version = this.ensure_version()?; + if version == 0 { + // We migrate to v1 + for quad in this.quads() { + let quad = quad?; + if !quad.graph_name.is_default_graph() { + this.insert_named_graph(quad.graph_name)?; + } + } + version = 1; + this.set_version(version)?; + this.graphs.flush()?; + } + + match version { + _ if version < LATEST_STORAGE_VERSION => Err(invalid_data_error(format!( + "The Sled database is using the outdated encoding version {}. Automated migration is not supported, please dump the store dataset using a compatible Oxigraph version and load it again using the current version", + version + ))), + LATEST_STORAGE_VERSION => Ok(this), + _ => Err(invalid_data_error(format!( + "The Sled database is using the too recent version {}. Upgrade to the latest Oxigraph version to load this database", + version + ))) + } + } + + fn ensure_version(&self) -> Result { + Ok(if let Some(version) = self.default.get("oxversion")? { + let mut buffer = [0; 8]; + buffer.copy_from_slice(&version); + u64::from_be_bytes(buffer) + } else { + self.set_version(LATEST_STORAGE_VERSION)?; + LATEST_STORAGE_VERSION + }) + } + + fn set_version(&self, version: u64) -> Result<(), io::Error> { + self.default.insert("oxversion", &version.to_be_bytes())?; + Ok(()) + } + + pub fn transaction( + &self, + f: impl Fn(StorageTransaction<'_>) -> Result>, + ) -> Result> { + Ok(( + &self.id2str, + &self.spog, + &self.posg, + &self.ospg, + &self.gspo, + &self.gpos, + &self.gosp, + &self.dspo, + &self.dpos, + &self.dosp, + &self.graphs, + ) + .transaction( + move |(id2str, spog, posg, ospg, gspo, gpos, gosp, dspo, dpos, dosp, graphs)| { + Ok(f(StorageTransaction { + id2str, + spog, + posg, + ospg, + gspo, + gpos, + gosp, + dspo, + dpos, + dosp, + graphs, + })?) + }, + )?) + } + + pub fn len(&self) -> usize { + self.gspo.len() + self.dspo.len() + } + + pub fn is_empty(&self) -> bool { + self.gspo.is_empty() && self.dspo.is_empty() + } + + pub fn contains(&self, quad: &EncodedQuad) -> Result { + let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); + if quad.graph_name.is_default_graph() { + write_spo_quad(&mut buffer, quad); + Ok(self.dspo.contains_key(buffer)?) + } else { + write_gspo_quad(&mut buffer, quad); + Ok(self.gspo.contains_key(buffer)?) + } + } + + pub fn quads_for_pattern( + &self, + subject: Option, + predicate: Option, + object: Option, + graph_name: Option, + ) -> ChainedDecodingQuadIterator { + match subject { + Some(subject) => match predicate { + Some(predicate) => match object { + Some(object) => match graph_name { + Some(graph_name) => self.quads_for_subject_predicate_object_graph( + subject, predicate, object, graph_name, + ), + None => self.quads_for_subject_predicate_object(subject, predicate, object), + }, + None => match graph_name { + Some(graph_name) => { + self.quads_for_subject_predicate_graph(subject, predicate, graph_name) + } + None => self.quads_for_subject_predicate(subject, predicate), + }, + }, + None => match object { + Some(object) => match graph_name { + Some(graph_name) => { + self.quads_for_subject_object_graph(subject, object, graph_name) + } + None => self.quads_for_subject_object(subject, object), + }, + None => match graph_name { + Some(graph_name) => self.quads_for_subject_graph(subject, graph_name), + None => self.quads_for_subject(subject), + }, + }, + }, + None => match predicate { + Some(predicate) => match object { + Some(object) => match graph_name { + Some(graph_name) => { + self.quads_for_predicate_object_graph(predicate, object, graph_name) + } + None => self.quads_for_predicate_object(predicate, object), + }, + None => match graph_name { + Some(graph_name) => self.quads_for_predicate_graph(predicate, graph_name), + None => self.quads_for_predicate(predicate), + }, + }, + None => match object { + Some(object) => match graph_name { + Some(graph_name) => self.quads_for_object_graph(object, graph_name), + None => self.quads_for_object(object), + }, + None => match graph_name { + Some(graph_name) => self.quads_for_graph(graph_name), + None => self.quads(), + }, + }, + }, + } + } + + pub fn quads(&self) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::pair( + self.dspo_quads(Vec::default()), + self.gspo_quads(Vec::default()), + ) + } + + fn quads_for_subject(&self, subject: EncodedTerm) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::pair( + self.dspo_quads(encode_term(subject)), + self.spog_quads(encode_term(subject)), + ) + } + + fn quads_for_subject_predicate( + &self, + subject: EncodedTerm, + predicate: EncodedTerm, + ) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::pair( + self.dspo_quads(encode_term_pair(subject, predicate)), + self.spog_quads(encode_term_pair(subject, predicate)), + ) + } + + fn quads_for_subject_predicate_object( + &self, + subject: EncodedTerm, + predicate: EncodedTerm, + object: EncodedTerm, + ) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::pair( + self.dspo_quads(encode_term_triple(subject, predicate, object)), + self.spog_quads(encode_term_triple(subject, predicate, object)), + ) + } + + fn quads_for_subject_object( + &self, + subject: EncodedTerm, + object: EncodedTerm, + ) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::pair( + self.dosp_quads(encode_term_pair(object, subject)), + self.ospg_quads(encode_term_pair(object, subject)), + ) + } + + fn quads_for_predicate(&self, predicate: EncodedTerm) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::pair( + self.dpos_quads(encode_term(predicate)), + self.posg_quads(encode_term(predicate)), + ) + } + + fn quads_for_predicate_object( + &self, + predicate: EncodedTerm, + object: EncodedTerm, + ) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::pair( + self.dpos_quads(encode_term_pair(predicate, object)), + self.posg_quads(encode_term_pair(predicate, object)), + ) + } + + fn quads_for_object(&self, object: EncodedTerm) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::pair( + self.dosp_quads(encode_term(object)), + self.ospg_quads(encode_term(object)), + ) + } + + fn quads_for_graph(&self, graph_name: EncodedTerm) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { + self.dspo_quads(Vec::default()) + } else { + self.gspo_quads(encode_term(graph_name)) + }) + } + + fn quads_for_subject_graph( + &self, + subject: EncodedTerm, + graph_name: EncodedTerm, + ) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { + self.dspo_quads(encode_term(subject)) + } else { + self.gspo_quads(encode_term_pair(graph_name, subject)) + }) + } + + fn quads_for_subject_predicate_graph( + &self, + subject: EncodedTerm, + predicate: EncodedTerm, + graph_name: EncodedTerm, + ) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { + self.dspo_quads(encode_term_pair(subject, predicate)) + } else { + self.gspo_quads(encode_term_triple(graph_name, subject, predicate)) + }) + } + + fn quads_for_subject_predicate_object_graph( + &self, + subject: EncodedTerm, + predicate: EncodedTerm, + object: EncodedTerm, + graph_name: EncodedTerm, + ) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { + self.dspo_quads(encode_term_triple(subject, predicate, object)) + } else { + self.gspo_quads(encode_term_quad(graph_name, subject, predicate, object)) + }) + } + + fn quads_for_subject_object_graph( + &self, + subject: EncodedTerm, + object: EncodedTerm, + graph_name: EncodedTerm, + ) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { + self.dosp_quads(encode_term_pair(object, subject)) + } else { + self.gosp_quads(encode_term_triple(graph_name, object, subject)) + }) + } + + fn quads_for_predicate_graph( + &self, + predicate: EncodedTerm, + graph_name: EncodedTerm, + ) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { + self.dpos_quads(encode_term(predicate)) + } else { + self.gpos_quads(encode_term_pair(graph_name, predicate)) + }) + } + + fn quads_for_predicate_object_graph( + &self, + predicate: EncodedTerm, + object: EncodedTerm, + graph_name: EncodedTerm, + ) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { + self.dpos_quads(encode_term_pair(predicate, object)) + } else { + self.gpos_quads(encode_term_triple(graph_name, predicate, object)) + }) + } + + fn quads_for_object_graph( + &self, + object: EncodedTerm, + graph_name: EncodedTerm, + ) -> ChainedDecodingQuadIterator { + ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() { + self.dosp_quads(encode_term(object)) + } else { + self.gosp_quads(encode_term_pair(graph_name, object)) + }) + } + + pub fn named_graphs(&self) -> DecodingGraphIterator { + DecodingGraphIterator { + iter: self.graphs.iter(), + } + } + + pub fn contains_named_graph(&self, graph_name: EncodedTerm) -> Result { + Ok(self.graphs.contains_key(&encode_term(graph_name))?) + } + + fn spog_quads(&self, prefix: Vec) -> DecodingQuadIterator { + self.inner_quads(&self.spog, prefix, QuadEncoding::Spog) + } + + fn posg_quads(&self, prefix: Vec) -> DecodingQuadIterator { + self.inner_quads(&self.posg, prefix, QuadEncoding::Posg) + } + + fn ospg_quads(&self, prefix: Vec) -> DecodingQuadIterator { + self.inner_quads(&self.ospg, prefix, QuadEncoding::Ospg) + } + + fn gspo_quads(&self, prefix: Vec) -> DecodingQuadIterator { + self.inner_quads(&self.gspo, prefix, QuadEncoding::Gspo) + } + + fn gpos_quads(&self, prefix: Vec) -> DecodingQuadIterator { + self.inner_quads(&self.gpos, prefix, QuadEncoding::Gpos) + } + + fn gosp_quads(&self, prefix: Vec) -> DecodingQuadIterator { + self.inner_quads(&self.gosp, prefix, QuadEncoding::Gosp) + } + + fn dspo_quads(&self, prefix: Vec) -> DecodingQuadIterator { + self.inner_quads(&self.dspo, prefix, QuadEncoding::Dspo) + } + + fn dpos_quads(&self, prefix: Vec) -> DecodingQuadIterator { + self.inner_quads(&self.dpos, prefix, QuadEncoding::Dpos) + } + + fn dosp_quads(&self, prefix: Vec) -> DecodingQuadIterator { + self.inner_quads(&self.dosp, prefix, QuadEncoding::Dosp) + } + + fn inner_quads( + &self, + tree: &Tree, + prefix: impl AsRef<[u8]>, + encoding: QuadEncoding, + ) -> DecodingQuadIterator { + DecodingQuadIterator { + iter: tree.scan_prefix(prefix), + encoding, + } + } + + pub fn insert(&self, quad: &EncodedQuad) -> Result<(), io::Error> { + let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); + + if quad.graph_name.is_default_graph() { + write_spo_quad(&mut buffer, quad); + self.dspo.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_pos_quad(&mut buffer, quad); + self.dpos.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_osp_quad(&mut buffer, quad); + self.dosp.insert(buffer.as_slice(), &[])?; + buffer.clear(); + } else { + write_spog_quad(&mut buffer, quad); + self.spog.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_posg_quad(&mut buffer, quad); + self.posg.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_ospg_quad(&mut buffer, quad); + self.ospg.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_gspo_quad(&mut buffer, quad); + self.gspo.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_gpos_quad(&mut buffer, quad); + self.gpos.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_gosp_quad(&mut buffer, quad); + self.gosp.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_term(&mut buffer, quad.graph_name); + self.graphs.insert(&buffer, &[])?; + buffer.clear(); + } + + Ok(()) + } + + pub fn remove(&self, quad: &EncodedQuad) -> Result<(), io::Error> { + let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); + + if quad.graph_name.is_default_graph() { + write_spo_quad(&mut buffer, quad); + self.dspo.remove(buffer.as_slice())?; + buffer.clear(); + + write_pos_quad(&mut buffer, quad); + self.dpos.remove(buffer.as_slice())?; + buffer.clear(); + + write_osp_quad(&mut buffer, quad); + self.dosp.remove(buffer.as_slice())?; + buffer.clear(); + } else { + write_spog_quad(&mut buffer, quad); + self.spog.remove(buffer.as_slice())?; + buffer.clear(); + + write_posg_quad(&mut buffer, quad); + self.posg.remove(buffer.as_slice())?; + buffer.clear(); + + write_ospg_quad(&mut buffer, quad); + self.ospg.remove(buffer.as_slice())?; + buffer.clear(); + + write_gspo_quad(&mut buffer, quad); + self.gspo.remove(buffer.as_slice())?; + buffer.clear(); + + write_gpos_quad(&mut buffer, quad); + self.gpos.remove(buffer.as_slice())?; + buffer.clear(); + + write_gosp_quad(&mut buffer, quad); + self.gosp.remove(buffer.as_slice())?; + buffer.clear(); + } + + Ok(()) + } + + pub fn insert_named_graph(&self, graph_name: EncodedTerm) -> Result<(), io::Error> { + self.graphs.insert(&encode_term(graph_name), &[])?; + Ok(()) + } + + pub fn clear_graph(&self, graph_name: EncodedTerm) -> Result<(), io::Error> { + if graph_name.is_default_graph() { + self.dspo.clear()?; + self.dpos.clear()?; + self.dosp.clear()?; + } else { + for quad in self.quads_for_graph(graph_name) { + self.remove(&quad?)?; + } + } + Ok(()) + } + + pub fn remove_named_graph(&self, graph_name: EncodedTerm) -> Result<(), io::Error> { + for quad in self.quads_for_graph(graph_name) { + self.remove(&quad?)?; + } + self.graphs.remove(&encode_term(graph_name))?; + Ok(()) + } + + pub fn clear(&self) -> Result<(), io::Error> { + self.dspo.clear()?; + self.dpos.clear()?; + self.dosp.clear()?; + self.gspo.clear()?; + self.gpos.clear()?; + self.gosp.clear()?; + self.spog.clear()?; + self.posg.clear()?; + self.ospg.clear()?; + self.graphs.clear()?; + self.id2str.clear()?; + Ok(()) + } + + pub fn get_str(&self, key: StrHash) -> Result, io::Error> { + self.id2str + .get(key.to_be_bytes())? + .map(|v| String::from_utf8(v.to_vec())) + .transpose() + .map_err(invalid_data_error) + } + + pub fn contains_str(&self, key: StrHash) -> Result { + Ok(self.id2str.contains_key(key.to_be_bytes())?) + } + + pub fn insert_str(&self, key: StrHash, value: &str) -> Result<(), io::Error> { + self.id2str.insert(key.to_be_bytes(), value)?; + Ok(()) + } +} + +pub struct ChainedDecodingQuadIterator { + first: DecodingQuadIterator, + second: Option, +} + +impl ChainedDecodingQuadIterator { + fn new(first: DecodingQuadIterator) -> Self { + Self { + first, + second: None, + } + } + + fn pair(first: DecodingQuadIterator, second: DecodingQuadIterator) -> Self { + Self { + first, + second: Some(second), + } + } +} + +impl Iterator for ChainedDecodingQuadIterator { + type Item = Result; + + fn next(&mut self) -> Option> { + if let Some(result) = self.first.next() { + Some(result) + } else if let Some(second) = self.second.as_mut() { + second.next() + } else { + None + } + } +} + +pub struct DecodingQuadIterator { + iter: Iter, + encoding: QuadEncoding, +} + +impl Iterator for DecodingQuadIterator { + type Item = Result; + + fn next(&mut self) -> Option> { + Some(match self.iter.next()? { + Ok((encoded, _)) => self.encoding.decode(&encoded), + Err(error) => Err(error.into()), + }) + } +} + +pub struct DecodingGraphIterator { + iter: Iter, +} + +impl Iterator for DecodingGraphIterator { + type Item = Result; + + fn next(&mut self) -> Option> { + Some(match self.iter.next()? { + Ok((encoded, _)) => decode_term(&encoded), + Err(error) => Err(error.into()), + }) + } +} + +pub struct StorageTransaction<'a> { + id2str: &'a TransactionalTree, + spog: &'a TransactionalTree, + posg: &'a TransactionalTree, + ospg: &'a TransactionalTree, + gspo: &'a TransactionalTree, + gpos: &'a TransactionalTree, + gosp: &'a TransactionalTree, + dspo: &'a TransactionalTree, + dpos: &'a TransactionalTree, + dosp: &'a TransactionalTree, + graphs: &'a TransactionalTree, +} + +impl<'a> StorageTransaction<'a> { + pub fn insert(&self, quad: &EncodedQuad) -> Result<(), SledUnabortableTransactionError> { + let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); + + if quad.graph_name.is_default_graph() { + write_spo_quad(&mut buffer, quad); + self.dspo.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_pos_quad(&mut buffer, quad); + self.dpos.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_osp_quad(&mut buffer, quad); + self.dosp.insert(buffer.as_slice(), &[])?; + buffer.clear(); + } else { + write_spog_quad(&mut buffer, quad); + self.spog.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_posg_quad(&mut buffer, quad); + self.posg.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_ospg_quad(&mut buffer, quad); + self.ospg.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_gspo_quad(&mut buffer, quad); + self.gspo.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_gpos_quad(&mut buffer, quad); + self.gpos.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_gosp_quad(&mut buffer, quad); + self.gosp.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_term(&mut buffer, quad.graph_name); + self.graphs.insert(buffer.as_slice(), &[])?; + buffer.clear(); + } + + Ok(()) + } + + pub fn remove(&self, quad: &EncodedQuad) -> Result<(), SledUnabortableTransactionError> { + let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); + + if quad.graph_name.is_default_graph() { + write_spo_quad(&mut buffer, quad); + self.dspo.remove(buffer.as_slice())?; + buffer.clear(); + + write_pos_quad(&mut buffer, quad); + self.dpos.remove(buffer.as_slice())?; + buffer.clear(); + + write_osp_quad(&mut buffer, quad); + self.dosp.remove(buffer.as_slice())?; + buffer.clear(); + } else { + write_spog_quad(&mut buffer, quad); + self.spog.remove(buffer.as_slice())?; + buffer.clear(); + + write_posg_quad(&mut buffer, quad); + self.posg.remove(buffer.as_slice())?; + buffer.clear(); + + write_ospg_quad(&mut buffer, quad); + self.ospg.remove(buffer.as_slice())?; + buffer.clear(); + + write_gspo_quad(&mut buffer, quad); + self.gspo.remove(buffer.as_slice())?; + buffer.clear(); + + write_gpos_quad(&mut buffer, quad); + self.gpos.remove(buffer.as_slice())?; + buffer.clear(); + + write_gosp_quad(&mut buffer, quad); + self.gosp.remove(buffer.as_slice())?; + buffer.clear(); + } + + Ok(()) + } + + pub fn insert_named_graph( + &self, + graph_name: EncodedTerm, + ) -> Result<(), SledUnabortableTransactionError> { + self.graphs.insert(encode_term(graph_name), &[])?; + Ok(()) + } + + pub fn get_str(&self, key: StrHash) -> Result, SledUnabortableTransactionError> { + self.id2str + .get(key.to_be_bytes())? + .map(|v| String::from_utf8(v.to_vec())) + .transpose() + .map_err(|e| SledUnabortableTransactionError::Storage(invalid_data_error(e))) + } + + pub fn contains_str(&self, key: StrHash) -> Result { + Ok(self.id2str.get(key.to_be_bytes())?.is_some()) + } + + pub fn insert_str( + &self, + key: StrHash, + value: &str, + ) -> Result<(), SledUnabortableTransactionError> { + self.id2str.insert(&key.to_be_bytes(), value)?; + Ok(()) + } +} + +/// Error returned by a Sled transaction +#[derive(Debug)] +pub enum SledTransactionError { + /// A failure returned by the API user that have aborted the transaction + Abort(T), + /// A storage related error + Storage(io::Error), +} + +impl fmt::Display for SledTransactionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Abort(e) => e.fmt(f), + Self::Storage(e) => e.fmt(f), + } + } +} + +impl Error for SledTransactionError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::Abort(e) => Some(e), + Self::Storage(e) => Some(e), + } + } +} + +impl From> for SledTransactionError { + fn from(e: TransactionError) -> Self { + match e { + TransactionError::Abort(e) => Self::Abort(e), + TransactionError::Storage(e) => Self::Storage(e.into()), + } + } +} + +impl> From> for io::Error { + fn from(e: SledTransactionError) -> Self { + match e { + SledTransactionError::Abort(e) => e.into(), + SledTransactionError::Storage(e) => e, + } + } +} + +/// An error returned from the transaction methods. +/// Should be returned as it is +#[derive(Debug)] +pub enum SledUnabortableTransactionError { + #[doc(hidden)] + Conflict, + /// A regular error + Storage(io::Error), +} + +impl fmt::Display for SledUnabortableTransactionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Conflict => write!(f, "Transaction conflict"), + Self::Storage(e) => e.fmt(f), + } + } +} + +impl Error for SledUnabortableTransactionError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::Storage(e) => Some(e), + _ => None, + } + } +} + +impl From for EvaluationError { + fn from(e: SledUnabortableTransactionError) -> Self { + match e { + SledUnabortableTransactionError::Storage(e) => Self::Io(e), + SledUnabortableTransactionError::Conflict => Self::Conflict, + } + } +} + +impl From> for SledUnabortableTransactionError { + fn from(e: StoreOrParseError) -> Self { + match e { + StoreOrParseError::Store(e) => e, + StoreOrParseError::Parse(e) => Self::Storage(e), + } + } +} + +impl From for SledUnabortableTransactionError { + fn from(e: UnabortableTransactionError) -> Self { + match e { + UnabortableTransactionError::Storage(e) => Self::Storage(e.into()), + UnabortableTransactionError::Conflict => Self::Conflict, + } + } +} + +/// An error returned from the transaction closure +#[derive(Debug)] +pub enum SledConflictableTransactionError { + /// A failure returned by the user that will abort the transaction + Abort(T), + #[doc(hidden)] + Conflict, + /// A storage related error + Storage(io::Error), +} + +impl fmt::Display for SledConflictableTransactionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Conflict => write!(f, "Transaction conflict"), + Self::Storage(e) => e.fmt(f), + Self::Abort(e) => e.fmt(f), + } + } +} + +impl Error for SledConflictableTransactionError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::Abort(e) => Some(e), + Self::Storage(e) => Some(e), + _ => None, + } + } +} + +impl From for SledConflictableTransactionError { + fn from(e: SledUnabortableTransactionError) -> Self { + match e { + SledUnabortableTransactionError::Storage(e) => Self::Storage(e), + SledUnabortableTransactionError::Conflict => Self::Conflict, + } + } +} + +impl From> for ConflictableTransactionError { + fn from(e: SledConflictableTransactionError) -> Self { + match e { + SledConflictableTransactionError::Abort(e) => ConflictableTransactionError::Abort(e), + SledConflictableTransactionError::Conflict => ConflictableTransactionError::Conflict, + SledConflictableTransactionError::Storage(e) => { + ConflictableTransactionError::Storage(e.into()) + } + } + } +}