Implements proper Sled transactions

pull/46/head
Tpt 4 years ago
parent 3d07160167
commit 69686c8710
  1. 5
      lib/src/sparql/error.rs
  2. 523
      lib/src/store/sled.rs

@ -19,6 +19,9 @@ pub enum EvaluationError {
Io(io::Error), Io(io::Error),
/// An error returned during the query evaluation itself /// An error returned during the query evaluation itself
Query(QueryError), Query(QueryError),
/// A conflict during a transaction
#[doc(hidden)]
Conflict,
} }
#[derive(Debug)] #[derive(Debug)]
@ -38,6 +41,7 @@ impl fmt::Display for EvaluationError {
Self::Parsing(error) => error.fmt(f), Self::Parsing(error) => error.fmt(f),
Self::Io(error) => error.fmt(f), Self::Io(error) => error.fmt(f),
Self::Query(error) => error.fmt(f), Self::Query(error) => error.fmt(f),
Self::Conflict => write!(f, "Transaction conflict"),
} }
} }
} }
@ -57,6 +61,7 @@ impl error::Error for EvaluationError {
Self::Parsing(e) => Some(e), Self::Parsing(e) => Some(e),
Self::Io(e) => Some(e), Self::Io(e) => Some(e),
Self::Query(e) => Some(e), Self::Query(e) => Some(e),
_ => None,
} }
} }
} }

@ -1,15 +1,21 @@
//! Store based on the [Sled](https://sled.rs/) key-value database. //! Store based on the [Sled](https://sled.rs/) key-value database.
use crate::error::{invalid_data_error, UnwrapInfallible}; use crate::error::invalid_data_error;
use crate::io::{DatasetFormat, GraphFormat}; use crate::io::{DatasetFormat, GraphFormat};
use crate::model::*; use crate::model::*;
use crate::sparql::{EvaluationError, Query, QueryOptions, QueryResult, SimplePreparedQuery}; use crate::sparql::{EvaluationError, Query, QueryOptions, QueryResult, SimplePreparedQuery};
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::store::{ use crate::store::{
dump_dataset, dump_graph, load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore, dump_dataset, dump_graph, load_dataset, load_graph, ReadableEncodedStore, StoreOrParseError,
WritableEncodedStore,
}; };
use sled::{Batch, Config, Iter, Tree}; use sled::transaction::{
use std::convert::{Infallible, TryInto}; ConflictableTransactionError, TransactionError, Transactional, TransactionalTree,
UnabortableTransactionError,
};
use sled::{Config, Iter, Tree};
use std::convert::TryInto;
use std::error::Error;
use std::io::{BufRead, Cursor, Write}; use std::io::{BufRead, Cursor, Write};
use std::path::Path; use std::path::Path;
use std::{fmt, io, str}; use std::{fmt, io, str};
@ -149,16 +155,34 @@ impl SledStore {
/// The transaction is executed if the given closure returns `Ok`. /// The transaction is executed if the given closure returns `Ok`.
/// Nothing is done if the closure returns `Err`. /// Nothing is done if the closure returns `Err`.
/// ///
/// See `MemoryStore` for a usage example. /// Usage example:
pub fn transaction<'a, E: From<io::Error>>( /// ```
&'a self, /// use oxigraph::SledStore;
f: impl FnOnce(&mut SledTransaction<'a>) -> Result<(), E>, /// use oxigraph::model::*;
) -> Result<(), E> { /// use oxigraph::store::sled::SledConflictableTransactionError;
let mut transaction = SledTransaction { /// use std::convert::Infallible;
inner: BatchWriter::new(self), ///
}; /// let store = SledStore::new()?;
f(&mut transaction)?; ///
Ok(transaction.inner.apply()?) /// let ex = NamedNode::new("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
///
/// // transaction
/// store.transaction(|transaction| {
/// transaction.insert(&quad)?;
/// Ok(()) as Result<(),SledConflictableTransactionError<Infallible>>
/// })?;
///
/// // quad filter
/// assert!(store.contains(&quad)?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn transaction<T, E>(
&self,
f: impl Fn(SledTransaction<'_>) -> Result<T, SledConflictableTransactionError<E>>,
) -> Result<T, SledTransactionError<E>> {
Ok((&self.id2str, &self.quads)
.transaction(move |(id2str, quads)| Ok(f(SledTransaction { id2str, quads })?))?)
} }
/// Loads a graph file (i.e. triples) into the store /// Loads a graph file (i.e. triples) into the store
@ -178,13 +202,8 @@ impl SledStore {
to_graph_name: &GraphName, to_graph_name: &GraphName,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<(), io::Error> { ) -> Result<(), io::Error> {
load_graph( let mut this = self;
&mut DirectWriter::new(self), load_graph(&mut this, reader, format, to_graph_name, base_iri)?;
reader,
format,
to_graph_name,
base_iri,
)?;
Ok(()) Ok(())
} }
@ -204,21 +223,23 @@ impl SledStore {
format: DatasetFormat, format: DatasetFormat,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<(), io::Error> { ) -> Result<(), io::Error> {
load_dataset(&mut DirectWriter::new(self), reader, format, base_iri)?; let mut this = self;
load_dataset(&mut this, reader, format, base_iri)?;
Ok(()) Ok(())
} }
/// Adds a quad to this store. /// Adds a quad to this store.
pub fn insert(&self, quad: &Quad) -> Result<(), io::Error> { pub fn insert(&self, quad: &Quad) -> Result<(), io::Error> {
let mut writer = DirectWriter::new(self); let mut this = self;
let quad = writer.encode_quad(quad)?; let quad = this.encode_quad(quad)?;
writer.insert_encoded(&quad) this.insert_encoded(&quad)
} }
/// Removes a quad from this store. /// Removes a quad from this store.
pub fn remove(&self, quad: &Quad) -> Result<(), io::Error> { pub fn remove(&self, quad: &Quad) -> Result<(), io::Error> {
let mut this = self;
let quad = quad.into(); let quad = quad.into();
DirectWriter::new(self).remove_encoded(&quad) this.remove_encoded(&quad)
} }
/// Dumps a store graph into a file. /// Dumps a store graph into a file.
@ -470,181 +491,74 @@ impl ReadableEncodedStore for SledStore {
} }
} }
struct DirectWriter<'a> { impl<'a> WithStoreError for &'a SledStore {
store: &'a SledStore,
buffer: Vec<u8>,
}
impl<'a> DirectWriter<'a> {
fn new(store: &'a SledStore) -> Self {
Self {
store,
buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1),
}
}
}
impl WithStoreError for DirectWriter<'_> {
type Error = io::Error; type Error = io::Error;
} }
impl<'a> StrContainer for DirectWriter<'a> { impl<'a> StrContainer for &'a SledStore {
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<(), io::Error> { fn insert_str(&mut self, key: StrHash, value: &str) -> Result<(), io::Error> {
self.store self.id2str.insert(key.to_be_bytes().as_ref(), value)?;
.id2str
.insert(key.to_be_bytes().as_ref(), value)?;
Ok(()) Ok(())
} }
} }
impl<'a> WritableEncodedStore for DirectWriter<'a> { impl<'a> WritableEncodedStore for &'a SledStore {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
write_spog_quad(&mut self.buffer, quad); let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
self.store.quads.insert(self.buffer.as_slice(), &[])?;
self.buffer.clear();
write_posg_quad(&mut self.buffer, quad);
self.store.quads.insert(self.buffer.as_slice(), &[])?;
self.buffer.clear();
write_ospg_quad(&mut self.buffer, quad);
self.store.quads.insert(self.buffer.as_slice(), &[])?;
self.buffer.clear();
write_gspo_quad(&mut self.buffer, quad);
self.store.quads.insert(self.buffer.as_slice(), &[])?;
self.buffer.clear();
write_gpos_quad(&mut self.buffer, quad);
self.store.quads.insert(self.buffer.as_slice(), &[])?;
self.buffer.clear();
write_gosp_quad(&mut self.buffer, quad);
self.store.quads.insert(self.buffer.as_slice(), &[])?;
self.buffer.clear();
Ok(()) write_spog_quad(&mut buffer, quad);
} self.quads.insert(buffer.as_slice(), &[])?;
buffer.clear();
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
write_spog_quad(&mut self.buffer, quad);
self.store.quads.remove(self.buffer.as_slice())?;
self.buffer.clear();
write_posg_quad(&mut self.buffer, quad);
self.store.quads.remove(self.buffer.as_slice())?;
self.buffer.clear();
write_ospg_quad(&mut self.buffer, quad);
self.store.quads.remove(self.buffer.as_slice())?;
self.buffer.clear();
write_gspo_quad(&mut self.buffer, quad);
self.store.quads.remove(self.buffer.as_slice())?;
self.buffer.clear();
write_gpos_quad(&mut self.buffer, quad); write_posg_quad(&mut buffer, quad);
self.store.quads.remove(self.buffer.as_slice())?; self.quads.insert(buffer.as_slice(), &[])?;
self.buffer.clear(); buffer.clear();
write_gosp_quad(&mut self.buffer, quad); write_ospg_quad(&mut buffer, quad);
self.store.quads.remove(self.buffer.as_slice())?; self.quads.insert(buffer.as_slice(), &[])?;
self.buffer.clear(); buffer.clear();
Ok(()) write_gspo_quad(&mut buffer, quad);
} self.quads.insert(buffer.as_slice(), &[])?;
} buffer.clear();
struct BatchWriter<'a> { write_gpos_quad(&mut buffer, quad);
store: &'a SledStore, self.quads.insert(buffer.as_slice(), &[])?;
quads: Batch, buffer.clear();
id2str: Batch,
buffer: Vec<u8>,
}
impl<'a> BatchWriter<'a> { write_gosp_quad(&mut buffer, quad);
fn new(store: &'a SledStore) -> Self { self.quads.insert(buffer.as_slice(), &[])?;
Self { buffer.clear();
store,
quads: Batch::default(),
id2str: Batch::default(),
buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1),
}
}
}
impl<'a> BatchWriter<'a> {
fn apply(self) -> Result<(), io::Error> {
self.store.id2str.apply_batch(self.id2str)?;
self.store.quads.apply_batch(self.quads)?;
Ok(()) Ok(())
} }
}
impl WithStoreError for BatchWriter<'_> {
type Error = Infallible;
}
impl<'a> StrContainer for BatchWriter<'a> { fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<(), Infallible> { let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
self.id2str.insert(key.to_be_bytes().as_ref(), value);
Ok(())
}
}
impl<'a> WritableEncodedStore for BatchWriter<'a> {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), Infallible> {
write_spog_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
write_posg_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
write_ospg_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
write_gspo_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
write_gpos_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
write_gosp_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
Ok(())
}
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), Infallible> { write_spog_quad(&mut buffer, quad);
write_spog_quad(&mut self.buffer, quad); self.quads.remove(buffer.as_slice())?;
self.quads.remove(self.buffer.as_slice()); buffer.clear();
self.buffer.clear();
write_posg_quad(&mut self.buffer, quad); write_posg_quad(&mut buffer, quad);
self.quads.remove(self.buffer.as_slice()); self.quads.remove(buffer.as_slice())?;
self.buffer.clear(); buffer.clear();
write_ospg_quad(&mut self.buffer, quad); write_ospg_quad(&mut buffer, quad);
self.quads.remove(self.buffer.as_slice()); self.quads.remove(buffer.as_slice())?;
self.buffer.clear(); buffer.clear();
write_gspo_quad(&mut self.buffer, quad); write_gspo_quad(&mut buffer, quad);
self.quads.remove(self.buffer.as_slice()); self.quads.remove(buffer.as_slice())?;
self.buffer.clear(); buffer.clear();
write_gpos_quad(&mut self.buffer, quad); write_gpos_quad(&mut buffer, quad);
self.quads.remove(self.buffer.as_slice()); self.quads.remove(buffer.as_slice())?;
self.buffer.clear(); buffer.clear();
write_gosp_quad(&mut self.buffer, quad); write_gosp_quad(&mut buffer, quad);
self.quads.remove(self.buffer.as_slice()); self.quads.remove(buffer.as_slice())?;
self.buffer.clear(); buffer.clear();
Ok(()) Ok(())
} }
@ -652,7 +566,8 @@ impl<'a> WritableEncodedStore for BatchWriter<'a> {
/// Allows inserting and deleting quads during a transaction with the `SeldStore`. /// Allows inserting and deleting quads during a transaction with the `SeldStore`.
pub struct SledTransaction<'a> { pub struct SledTransaction<'a> {
inner: BatchWriter<'a>, quads: &'a TransactionalTree,
id2str: &'a TransactionalTree,
} }
impl SledTransaction<'_> { impl SledTransaction<'_> {
@ -667,13 +582,14 @@ impl SledTransaction<'_> {
/// Errors related to parameter validation like the base IRI use the `INVALID_INPUT` error kind. /// Errors related to parameter validation like the base IRI use the `INVALID_INPUT` error kind.
/// Errors related to a bad syntax in the loaded file use the `INVALID_DATA` error kind. /// Errors related to a bad syntax in the loaded file use the `INVALID_DATA` error kind.
pub fn load_graph( pub fn load_graph(
&mut self, &self,
reader: impl BufRead, reader: impl BufRead,
format: GraphFormat, format: GraphFormat,
to_graph_name: &GraphName, to_graph_name: &GraphName,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<(), io::Error> { ) -> Result<(), SledUnabortableTransactionError> {
load_graph(&mut self.inner, reader, format, to_graph_name, base_iri)?; let mut this = self;
load_graph(&mut this, reader, format, to_graph_name, base_iri)?;
Ok(()) Ok(())
} }
@ -688,25 +604,256 @@ impl SledTransaction<'_> {
/// Errors related to parameter validation like the base IRI use the `INVALID_INPUT` error kind. /// Errors related to parameter validation like the base IRI use the `INVALID_INPUT` error kind.
/// Errors related to a bad syntax in the loaded file use the `INVALID_DATA` error kind. /// Errors related to a bad syntax in the loaded file use the `INVALID_DATA` error kind.
pub fn load_dataset( pub fn load_dataset(
&mut self, &self,
reader: impl BufRead, reader: impl BufRead,
format: DatasetFormat, format: DatasetFormat,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<(), io::Error> { ) -> Result<(), SledUnabortableTransactionError> {
load_dataset(&mut self.inner, reader, format, base_iri)?; let mut this = self;
load_dataset(&mut this, reader, format, base_iri)?;
Ok(()) Ok(())
} }
/// Adds a quad to this store during the transaction. /// Adds a quad to this store during the transaction.
pub fn insert(&mut self, quad: &Quad) { pub fn insert(&self, quad: &Quad) -> Result<(), SledUnabortableTransactionError> {
let quad = self.inner.encode_quad(quad).unwrap_infallible(); let mut this = self;
self.inner.insert_encoded(&quad).unwrap_infallible() let quad = this.encode_quad(quad)?;
this.insert_encoded(&quad)
} }
/// Removes a quad from this store during the transaction. /// Removes a quad from this store during the transaction.
pub fn remove(&mut self, quad: &Quad) { pub fn remove(&self, quad: &Quad) -> Result<(), SledUnabortableTransactionError> {
let mut this = self;
let quad = quad.into(); let quad = quad.into();
self.inner.remove_encoded(&quad).unwrap_infallible() this.remove_encoded(&quad)
}
}
impl<'a> WithStoreError for &'a SledTransaction<'a> {
type Error = SledUnabortableTransactionError;
}
impl<'a> StrContainer for &'a SledTransaction<'a> {
fn insert_str(
&mut self,
key: StrHash,
value: &str,
) -> Result<(), SledUnabortableTransactionError> {
self.id2str.insert(key.to_be_bytes().as_ref(), value)?;
Ok(())
}
}
impl<'a> WritableEncodedStore for &'a SledTransaction<'a> {
fn insert_encoded(
&mut self,
quad: &EncodedQuad,
) -> Result<(), SledUnabortableTransactionError> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
write_spog_quad(&mut buffer, quad);
self.quads.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_posg_quad(&mut buffer, quad);
self.quads.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_ospg_quad(&mut buffer, quad);
self.quads.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gspo_quad(&mut buffer, quad);
self.quads.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gpos_quad(&mut buffer, quad);
self.quads.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gosp_quad(&mut buffer, quad);
self.quads.insert(buffer.as_slice(), &[])?;
buffer.clear();
Ok(())
}
fn remove_encoded(
&mut self,
quad: &EncodedQuad,
) -> Result<(), SledUnabortableTransactionError> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
write_spog_quad(&mut buffer, quad);
self.quads.remove(buffer.as_slice())?;
buffer.clear();
write_posg_quad(&mut buffer, quad);
self.quads.remove(buffer.as_slice())?;
buffer.clear();
write_ospg_quad(&mut buffer, quad);
self.quads.remove(buffer.as_slice())?;
buffer.clear();
write_gspo_quad(&mut buffer, quad);
self.quads.remove(buffer.as_slice())?;
buffer.clear();
write_gpos_quad(&mut buffer, quad);
self.quads.remove(buffer.as_slice())?;
buffer.clear();
write_gosp_quad(&mut buffer, quad);
self.quads.remove(buffer.as_slice())?;
buffer.clear();
Ok(())
}
}
/// Error returned by a Sled transaction
#[derive(Debug)]
pub enum SledTransactionError<T> {
/// An failure returned by the API user that have aborted the transaction
Abort(T),
/// A storage related error
Storage(io::Error),
}
impl<T: fmt::Display> fmt::Display for SledTransactionError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Abort(e) => e.fmt(f),
Self::Storage(e) => e.fmt(f),
}
}
}
impl<T: Error + 'static> Error for SledTransactionError<T> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Abort(e) => Some(e),
Self::Storage(e) => Some(e),
}
}
}
impl<T> From<TransactionError<T>> for SledTransactionError<T> {
fn from(e: TransactionError<T>) -> Self {
match e {
TransactionError::Abort(e) => Self::Abort(e),
TransactionError::Storage(e) => Self::Storage(e.into()),
}
}
}
/// An error returned from the transaction methods.
/// Should be returned as it is
#[derive(Debug)]
pub enum SledUnabortableTransactionError {
#[doc(hidden)]
Conflict,
/// A regular error
Storage(io::Error),
}
impl fmt::Display for SledUnabortableTransactionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Conflict => write!(f, "Transaction conflict"),
Self::Storage(e) => e.fmt(f),
}
}
}
impl Error for SledUnabortableTransactionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Storage(e) => Some(e),
_ => None,
}
}
}
impl From<SledUnabortableTransactionError> for EvaluationError {
fn from(e: SledUnabortableTransactionError) -> Self {
match e {
SledUnabortableTransactionError::Storage(e) => Self::Io(e),
SledUnabortableTransactionError::Conflict => Self::Conflict,
}
}
}
impl From<StoreOrParseError<SledUnabortableTransactionError, io::Error>>
for SledUnabortableTransactionError
{
fn from(e: StoreOrParseError<SledUnabortableTransactionError, io::Error>) -> Self {
match e {
StoreOrParseError::Store(e) => e,
StoreOrParseError::Parse(e) => Self::Storage(e),
}
}
}
impl From<UnabortableTransactionError> for SledUnabortableTransactionError {
fn from(e: UnabortableTransactionError) -> Self {
match e {
UnabortableTransactionError::Storage(e) => Self::Storage(e.into()),
UnabortableTransactionError::Conflict => Self::Conflict,
}
}
}
/// An error returned from the transaction closure
#[derive(Debug)]
pub enum SledConflictableTransactionError<T> {
/// A failure returned by the user that will abort the transaction
Abort(T),
#[doc(hidden)]
Conflict,
/// A storage related error
Storage(io::Error),
}
impl<T: fmt::Display> fmt::Display for SledConflictableTransactionError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Conflict => write!(f, "Transaction conflict"),
Self::Storage(e) => e.fmt(f),
Self::Abort(e) => e.fmt(f),
}
}
}
impl<T: Error + 'static> Error for SledConflictableTransactionError<T> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Abort(e) => Some(e),
Self::Storage(e) => Some(e),
_ => None,
}
}
}
impl<T> From<SledUnabortableTransactionError> for SledConflictableTransactionError<T> {
fn from(e: SledUnabortableTransactionError) -> Self {
match e {
SledUnabortableTransactionError::Storage(e) => Self::Storage(e),
SledUnabortableTransactionError::Conflict => Self::Conflict,
}
}
}
impl<T> From<SledConflictableTransactionError<T>> for ConflictableTransactionError<T> {
fn from(e: SledConflictableTransactionError<T>) -> Self {
match e {
SledConflictableTransactionError::Abort(e) => ConflictableTransactionError::Abort(e),
SledConflictableTransactionError::Conflict => ConflictableTransactionError::Conflict,
SledConflictableTransactionError::Storage(e) => {
ConflictableTransactionError::Storage(e.into())
}
}
} }
} }

Loading…
Cancel
Save