diff --git a/lib/src/sparql/error.rs b/lib/src/sparql/error.rs index 78228204..d42693c2 100644 --- a/lib/src/sparql/error.rs +++ b/lib/src/sparql/error.rs @@ -19,6 +19,9 @@ pub enum EvaluationError { Io(io::Error), /// An error returned during the query evaluation itself Query(QueryError), + /// A conflict during a transaction + #[doc(hidden)] + Conflict, } #[derive(Debug)] @@ -38,6 +41,7 @@ impl fmt::Display for EvaluationError { Self::Parsing(error) => error.fmt(f), Self::Io(error) => error.fmt(f), Self::Query(error) => error.fmt(f), + Self::Conflict => write!(f, "Transaction conflict"), } } } @@ -57,6 +61,7 @@ impl error::Error for EvaluationError { Self::Parsing(e) => Some(e), Self::Io(e) => Some(e), Self::Query(e) => Some(e), + _ => None, } } } diff --git a/lib/src/store/sled.rs b/lib/src/store/sled.rs index a91ca7c8..812d0eb4 100644 --- a/lib/src/store/sled.rs +++ b/lib/src/store/sled.rs @@ -1,15 +1,21 @@ //! Store based on the [Sled](https://sled.rs/) key-value database. -use crate::error::{invalid_data_error, UnwrapInfallible}; +use crate::error::invalid_data_error; use crate::io::{DatasetFormat, GraphFormat}; use crate::model::*; use crate::sparql::{EvaluationError, Query, QueryOptions, QueryResult, SimplePreparedQuery}; use crate::store::numeric_encoder::*; use crate::store::{ - dump_dataset, dump_graph, load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore, + dump_dataset, dump_graph, load_dataset, load_graph, ReadableEncodedStore, StoreOrParseError, + WritableEncodedStore, }; -use sled::{Batch, Config, Iter, Tree}; -use std::convert::{Infallible, TryInto}; +use sled::transaction::{ + ConflictableTransactionError, TransactionError, Transactional, TransactionalTree, + UnabortableTransactionError, +}; +use sled::{Config, Iter, Tree}; +use std::convert::TryInto; +use std::error::Error; use std::io::{BufRead, Cursor, Write}; use std::path::Path; use std::{fmt, io, str}; @@ -149,16 +155,34 @@ impl SledStore { /// The transaction is executed if the given closure returns `Ok`. /// Nothing is done if the closure returns `Err`. /// - /// See `MemoryStore` for a usage example. - pub fn transaction<'a, E: From>( - &'a self, - f: impl FnOnce(&mut SledTransaction<'a>) -> Result<(), E>, - ) -> Result<(), E> { - let mut transaction = SledTransaction { - inner: BatchWriter::new(self), - }; - f(&mut transaction)?; - Ok(transaction.inner.apply()?) + /// Usage example: + /// ``` + /// use oxigraph::SledStore; + /// use oxigraph::model::*; + /// use oxigraph::store::sled::SledConflictableTransactionError; + /// use std::convert::Infallible; + /// + /// let store = SledStore::new()?; + /// + /// let ex = NamedNode::new("http://example.com")?; + /// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); + /// + /// // transaction + /// store.transaction(|transaction| { + /// transaction.insert(&quad)?; + /// Ok(()) as Result<(),SledConflictableTransactionError> + /// })?; + /// + /// // quad filter + /// assert!(store.contains(&quad)?); + /// # Result::<_,Box>::Ok(()) + /// ``` + pub fn transaction( + &self, + f: impl Fn(SledTransaction<'_>) -> Result>, + ) -> Result> { + Ok((&self.id2str, &self.quads) + .transaction(move |(id2str, quads)| Ok(f(SledTransaction { id2str, quads })?))?) } /// Loads a graph file (i.e. triples) into the store @@ -178,13 +202,8 @@ impl SledStore { to_graph_name: &GraphName, base_iri: Option<&str>, ) -> Result<(), io::Error> { - load_graph( - &mut DirectWriter::new(self), - reader, - format, - to_graph_name, - base_iri, - )?; + let mut this = self; + load_graph(&mut this, reader, format, to_graph_name, base_iri)?; Ok(()) } @@ -204,21 +223,23 @@ impl SledStore { format: DatasetFormat, base_iri: Option<&str>, ) -> Result<(), io::Error> { - load_dataset(&mut DirectWriter::new(self), reader, format, base_iri)?; + let mut this = self; + load_dataset(&mut this, reader, format, base_iri)?; Ok(()) } /// Adds a quad to this store. pub fn insert(&self, quad: &Quad) -> Result<(), io::Error> { - let mut writer = DirectWriter::new(self); - let quad = writer.encode_quad(quad)?; - writer.insert_encoded(&quad) + let mut this = self; + let quad = this.encode_quad(quad)?; + this.insert_encoded(&quad) } /// Removes a quad from this store. pub fn remove(&self, quad: &Quad) -> Result<(), io::Error> { + let mut this = self; let quad = quad.into(); - DirectWriter::new(self).remove_encoded(&quad) + this.remove_encoded(&quad) } /// Dumps a store graph into a file. @@ -470,181 +491,74 @@ impl ReadableEncodedStore for SledStore { } } -struct DirectWriter<'a> { - store: &'a SledStore, - buffer: Vec, -} - -impl<'a> DirectWriter<'a> { - fn new(store: &'a SledStore) -> Self { - Self { - store, - buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1), - } - } -} - -impl WithStoreError for DirectWriter<'_> { +impl<'a> WithStoreError for &'a SledStore { type Error = io::Error; } -impl<'a> StrContainer for DirectWriter<'a> { +impl<'a> StrContainer for &'a SledStore { fn insert_str(&mut self, key: StrHash, value: &str) -> Result<(), io::Error> { - self.store - .id2str - .insert(key.to_be_bytes().as_ref(), value)?; + self.id2str.insert(key.to_be_bytes().as_ref(), value)?; Ok(()) } } -impl<'a> WritableEncodedStore for DirectWriter<'a> { +impl<'a> WritableEncodedStore for &'a SledStore { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> { - write_spog_quad(&mut self.buffer, quad); - self.store.quads.insert(self.buffer.as_slice(), &[])?; - self.buffer.clear(); - - write_posg_quad(&mut self.buffer, quad); - self.store.quads.insert(self.buffer.as_slice(), &[])?; - self.buffer.clear(); - - write_ospg_quad(&mut self.buffer, quad); - self.store.quads.insert(self.buffer.as_slice(), &[])?; - self.buffer.clear(); - - write_gspo_quad(&mut self.buffer, quad); - self.store.quads.insert(self.buffer.as_slice(), &[])?; - self.buffer.clear(); - - write_gpos_quad(&mut self.buffer, quad); - self.store.quads.insert(self.buffer.as_slice(), &[])?; - self.buffer.clear(); - - write_gosp_quad(&mut self.buffer, quad); - self.store.quads.insert(self.buffer.as_slice(), &[])?; - self.buffer.clear(); + let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); - Ok(()) - } - - fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> { - write_spog_quad(&mut self.buffer, quad); - self.store.quads.remove(self.buffer.as_slice())?; - self.buffer.clear(); - - write_posg_quad(&mut self.buffer, quad); - self.store.quads.remove(self.buffer.as_slice())?; - self.buffer.clear(); - - write_ospg_quad(&mut self.buffer, quad); - self.store.quads.remove(self.buffer.as_slice())?; - self.buffer.clear(); - - write_gspo_quad(&mut self.buffer, quad); - self.store.quads.remove(self.buffer.as_slice())?; - self.buffer.clear(); + write_spog_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); - write_gpos_quad(&mut self.buffer, quad); - self.store.quads.remove(self.buffer.as_slice())?; - self.buffer.clear(); + write_posg_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); - write_gosp_quad(&mut self.buffer, quad); - self.store.quads.remove(self.buffer.as_slice())?; - self.buffer.clear(); + write_ospg_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); - Ok(()) - } -} + write_gspo_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); -struct BatchWriter<'a> { - store: &'a SledStore, - quads: Batch, - id2str: Batch, - buffer: Vec, -} + write_gpos_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); -impl<'a> BatchWriter<'a> { - fn new(store: &'a SledStore) -> Self { - Self { - store, - quads: Batch::default(), - id2str: Batch::default(), - buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1), - } - } -} + write_gosp_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); -impl<'a> BatchWriter<'a> { - fn apply(self) -> Result<(), io::Error> { - self.store.id2str.apply_batch(self.id2str)?; - self.store.quads.apply_batch(self.quads)?; Ok(()) } -} - -impl WithStoreError for BatchWriter<'_> { - type Error = Infallible; -} -impl<'a> StrContainer for BatchWriter<'a> { - fn insert_str(&mut self, key: StrHash, value: &str) -> Result<(), Infallible> { - self.id2str.insert(key.to_be_bytes().as_ref(), value); - Ok(()) - } -} - -impl<'a> WritableEncodedStore for BatchWriter<'a> { - fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), Infallible> { - write_spog_quad(&mut self.buffer, quad); - self.quads.insert(self.buffer.as_slice(), &[]); - self.buffer.clear(); - - write_posg_quad(&mut self.buffer, quad); - self.quads.insert(self.buffer.as_slice(), &[]); - self.buffer.clear(); - - write_ospg_quad(&mut self.buffer, quad); - self.quads.insert(self.buffer.as_slice(), &[]); - self.buffer.clear(); - - write_gspo_quad(&mut self.buffer, quad); - self.quads.insert(self.buffer.as_slice(), &[]); - self.buffer.clear(); - - write_gpos_quad(&mut self.buffer, quad); - self.quads.insert(self.buffer.as_slice(), &[]); - self.buffer.clear(); - - write_gosp_quad(&mut self.buffer, quad); - self.quads.insert(self.buffer.as_slice(), &[]); - self.buffer.clear(); - - Ok(()) - } + fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> { + let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); - fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), Infallible> { - write_spog_quad(&mut self.buffer, quad); - self.quads.remove(self.buffer.as_slice()); - self.buffer.clear(); + write_spog_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); - write_posg_quad(&mut self.buffer, quad); - self.quads.remove(self.buffer.as_slice()); - self.buffer.clear(); + write_posg_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); - write_ospg_quad(&mut self.buffer, quad); - self.quads.remove(self.buffer.as_slice()); - self.buffer.clear(); + write_ospg_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); - write_gspo_quad(&mut self.buffer, quad); - self.quads.remove(self.buffer.as_slice()); - self.buffer.clear(); + write_gspo_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); - write_gpos_quad(&mut self.buffer, quad); - self.quads.remove(self.buffer.as_slice()); - self.buffer.clear(); + write_gpos_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); - write_gosp_quad(&mut self.buffer, quad); - self.quads.remove(self.buffer.as_slice()); - self.buffer.clear(); + write_gosp_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); Ok(()) } @@ -652,7 +566,8 @@ impl<'a> WritableEncodedStore for BatchWriter<'a> { /// Allows inserting and deleting quads during a transaction with the `SeldStore`. pub struct SledTransaction<'a> { - inner: BatchWriter<'a>, + quads: &'a TransactionalTree, + id2str: &'a TransactionalTree, } impl SledTransaction<'_> { @@ -667,13 +582,14 @@ impl SledTransaction<'_> { /// Errors related to parameter validation like the base IRI use the `INVALID_INPUT` error kind. /// Errors related to a bad syntax in the loaded file use the `INVALID_DATA` error kind. pub fn load_graph( - &mut self, + &self, reader: impl BufRead, format: GraphFormat, to_graph_name: &GraphName, base_iri: Option<&str>, - ) -> Result<(), io::Error> { - load_graph(&mut self.inner, reader, format, to_graph_name, base_iri)?; + ) -> Result<(), SledUnabortableTransactionError> { + let mut this = self; + load_graph(&mut this, reader, format, to_graph_name, base_iri)?; Ok(()) } @@ -688,25 +604,256 @@ impl SledTransaction<'_> { /// Errors related to parameter validation like the base IRI use the `INVALID_INPUT` error kind. /// Errors related to a bad syntax in the loaded file use the `INVALID_DATA` error kind. pub fn load_dataset( - &mut self, + &self, reader: impl BufRead, format: DatasetFormat, base_iri: Option<&str>, - ) -> Result<(), io::Error> { - load_dataset(&mut self.inner, reader, format, base_iri)?; + ) -> Result<(), SledUnabortableTransactionError> { + let mut this = self; + load_dataset(&mut this, reader, format, base_iri)?; Ok(()) } /// Adds a quad to this store during the transaction. - pub fn insert(&mut self, quad: &Quad) { - let quad = self.inner.encode_quad(quad).unwrap_infallible(); - self.inner.insert_encoded(&quad).unwrap_infallible() + pub fn insert(&self, quad: &Quad) -> Result<(), SledUnabortableTransactionError> { + let mut this = self; + let quad = this.encode_quad(quad)?; + this.insert_encoded(&quad) } /// Removes a quad from this store during the transaction. - pub fn remove(&mut self, quad: &Quad) { + pub fn remove(&self, quad: &Quad) -> Result<(), SledUnabortableTransactionError> { + let mut this = self; let quad = quad.into(); - self.inner.remove_encoded(&quad).unwrap_infallible() + this.remove_encoded(&quad) + } +} + +impl<'a> WithStoreError for &'a SledTransaction<'a> { + type Error = SledUnabortableTransactionError; +} + +impl<'a> StrContainer for &'a SledTransaction<'a> { + fn insert_str( + &mut self, + key: StrHash, + value: &str, + ) -> Result<(), SledUnabortableTransactionError> { + self.id2str.insert(key.to_be_bytes().as_ref(), value)?; + Ok(()) + } +} + +impl<'a> WritableEncodedStore for &'a SledTransaction<'a> { + fn insert_encoded( + &mut self, + quad: &EncodedQuad, + ) -> Result<(), SledUnabortableTransactionError> { + let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); + + write_spog_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_posg_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_ospg_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_gspo_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_gpos_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + write_gosp_quad(&mut buffer, quad); + self.quads.insert(buffer.as_slice(), &[])?; + buffer.clear(); + + Ok(()) + } + + fn remove_encoded( + &mut self, + quad: &EncodedQuad, + ) -> Result<(), SledUnabortableTransactionError> { + let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); + + write_spog_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); + + write_posg_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); + + write_ospg_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); + + write_gspo_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); + + write_gpos_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); + + write_gosp_quad(&mut buffer, quad); + self.quads.remove(buffer.as_slice())?; + buffer.clear(); + + Ok(()) + } +} + +/// Error returned by a Sled transaction +#[derive(Debug)] +pub enum SledTransactionError { + /// An failure returned by the API user that have aborted the transaction + Abort(T), + /// A storage related error + Storage(io::Error), +} + +impl fmt::Display for SledTransactionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Abort(e) => e.fmt(f), + Self::Storage(e) => e.fmt(f), + } + } +} + +impl Error for SledTransactionError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::Abort(e) => Some(e), + Self::Storage(e) => Some(e), + } + } +} + +impl From> for SledTransactionError { + fn from(e: TransactionError) -> Self { + match e { + TransactionError::Abort(e) => Self::Abort(e), + TransactionError::Storage(e) => Self::Storage(e.into()), + } + } +} + +/// An error returned from the transaction methods. +/// Should be returned as it is +#[derive(Debug)] +pub enum SledUnabortableTransactionError { + #[doc(hidden)] + Conflict, + /// A regular error + Storage(io::Error), +} + +impl fmt::Display for SledUnabortableTransactionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Conflict => write!(f, "Transaction conflict"), + Self::Storage(e) => e.fmt(f), + } + } +} + +impl Error for SledUnabortableTransactionError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::Storage(e) => Some(e), + _ => None, + } + } +} + +impl From for EvaluationError { + fn from(e: SledUnabortableTransactionError) -> Self { + match e { + SledUnabortableTransactionError::Storage(e) => Self::Io(e), + SledUnabortableTransactionError::Conflict => Self::Conflict, + } + } +} + +impl From> + for SledUnabortableTransactionError +{ + fn from(e: StoreOrParseError) -> Self { + match e { + StoreOrParseError::Store(e) => e, + StoreOrParseError::Parse(e) => Self::Storage(e), + } + } +} + +impl From for SledUnabortableTransactionError { + fn from(e: UnabortableTransactionError) -> Self { + match e { + UnabortableTransactionError::Storage(e) => Self::Storage(e.into()), + UnabortableTransactionError::Conflict => Self::Conflict, + } + } +} + +/// An error returned from the transaction closure +#[derive(Debug)] +pub enum SledConflictableTransactionError { + /// A failure returned by the user that will abort the transaction + Abort(T), + #[doc(hidden)] + Conflict, + /// A storage related error + Storage(io::Error), +} + +impl fmt::Display for SledConflictableTransactionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Conflict => write!(f, "Transaction conflict"), + Self::Storage(e) => e.fmt(f), + Self::Abort(e) => e.fmt(f), + } + } +} + +impl Error for SledConflictableTransactionError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::Abort(e) => Some(e), + Self::Storage(e) => Some(e), + _ => None, + } + } +} + +impl From for SledConflictableTransactionError { + fn from(e: SledUnabortableTransactionError) -> Self { + match e { + SledUnabortableTransactionError::Storage(e) => Self::Storage(e), + SledUnabortableTransactionError::Conflict => Self::Conflict, + } + } +} + +impl From> for ConflictableTransactionError { + fn from(e: SledConflictableTransactionError) -> Self { + match e { + SledConflictableTransactionError::Abort(e) => ConflictableTransactionError::Abort(e), + SledConflictableTransactionError::Conflict => ConflictableTransactionError::Conflict, + SledConflictableTransactionError::Storage(e) => { + ConflictableTransactionError::Storage(e.into()) + } + } } }