Avoids abusing std::io::Error

Introduces new specific errors and make them implement Into<io::Error> for compatibility
pull/190/head
Tpt 3 years ago
parent ec850b8ed4
commit a33dbb6d06
  1. 156
      lib/src/io/read.rs
  2. 9
      lib/src/model/dataset.rs
  3. 3
      lib/src/model/graph.rs
  4. 41
      lib/src/sophia.rs
  5. 11
      lib/src/sparql/dataset.rs
  6. 41
      lib/src/sparql/error.rs
  7. 37
      lib/src/sparql/update.rs
  8. 215
      lib/src/storage/backend/rocksdb.rs
  9. 59
      lib/src/storage/binary_encoder.rs
  10. 206
      lib/src/storage/error.rs
  11. 135
      lib/src/storage/mod.rs
  12. 144
      lib/src/storage/numeric_encoder.rs
  13. 92
      lib/src/store.rs
  14. 30
      lib/tests/store.rs
  15. 25
      python/src/io.rs
  16. 7
      python/src/sparql.rs
  17. 59
      python/src/store.rs
  18. 14
      server/src/main.rs
  19. 9
      spargebra/src/parser.rs

@ -5,11 +5,12 @@ use crate::model::*;
use oxiri::{Iri, IriParseError};
use rio_api::model as rio;
use rio_api::parser::{QuadsParser, TriplesParser};
use rio_turtle::{NQuadsParser, NTriplesParser, TriGParser, TurtleParser};
use rio_xml::RdfXmlParser;
use rio_turtle::{NQuadsParser, NTriplesParser, TriGParser, TurtleError, TurtleParser};
use rio_xml::{RdfXmlError, RdfXmlParser};
use std::collections::HashMap;
use std::io;
use std::error::Error;
use std::io::BufRead;
use std::{fmt, io};
/// Parsers for RDF graph serialization formats.
///
@ -67,7 +68,7 @@ impl GraphParser {
/// Executes the parsing itself on a [`BufRead`](std::io::BufRead) implementation and returns an iterator of triples
#[allow(clippy::unnecessary_wraps)]
pub fn read_triples<R: BufRead>(&self, reader: R) -> io::Result<TripleReader<R>> {
pub fn read_triples<R: BufRead>(&self, reader: R) -> Result<TripleReader<R>, ParserError> {
Ok(TripleReader {
mapper: RioMapper::default(),
parser: match self.format {
@ -114,9 +115,9 @@ enum TripleReaderKind<R: BufRead> {
}
impl<R: BufRead> Iterator for TripleReader<R> {
type Item = io::Result<Triple>;
type Item = Result<Triple, ParserError>;
fn next(&mut self) -> Option<io::Result<Triple>> {
fn next(&mut self) -> Option<Result<Triple, ParserError>> {
loop {
if let Some(r) = self.buffer.pop() {
return Some(Ok(r));
@ -144,9 +145,9 @@ impl<R: BufRead> TripleReader<R> {
parser: &mut P,
buffer: &mut Vec<Triple>,
mapper: &mut RioMapper,
) -> Option<io::Result<()>>
) -> Option<Result<(), ParserError>>
where
io::Error: From<P::Error>,
ParserError: From<P::Error>,
{
if parser.is_end() {
None
@ -216,7 +217,7 @@ impl DatasetParser {
/// Executes the parsing itself on a [`BufRead`](std::io::BufRead) implementation and returns an iterator of quads
#[allow(clippy::unnecessary_wraps)]
pub fn read_quads<R: BufRead>(&self, reader: R) -> io::Result<QuadReader<R>> {
pub fn read_quads<R: BufRead>(&self, reader: R) -> Result<QuadReader<R>, ParserError> {
Ok(QuadReader {
mapper: RioMapper::default(),
parser: match self.format {
@ -259,9 +260,9 @@ enum QuadReaderKind<R: BufRead> {
}
impl<R: BufRead> Iterator for QuadReader<R> {
type Item = io::Result<Quad>;
type Item = Result<Quad, ParserError>;
fn next(&mut self) -> Option<io::Result<Quad>> {
fn next(&mut self) -> Option<Result<Quad, ParserError>> {
loop {
if let Some(r) = self.buffer.pop() {
return Some(Ok(r));
@ -286,9 +287,9 @@ impl<R: BufRead> QuadReader<R> {
parser: &mut P,
buffer: &mut Vec<Quad>,
mapper: &mut RioMapper,
) -> Option<io::Result<()>>
) -> Option<Result<(), ParserError>>
where
io::Error: From<P::Error>,
ParserError: From<P::Error>,
{
if parser.is_end() {
None
@ -374,3 +375,132 @@ impl<'a> RioMapper {
}
}
}
/// Error returned during RDF format parsing.
#[derive(Debug)]
pub enum ParserError {
/// I/O error during parsing (file not found...).
Io(io::Error),
/// An error in the file syntax.
Syntax(SyntaxError),
}
impl ParserError {
pub(crate) fn invalid_base_iri(iri: &str, error: IriParseError) -> Self {
Self::Syntax(SyntaxError {
inner: SyntaxErrorKind::BaseIri {
iri: iri.to_owned(),
error,
},
})
}
}
impl fmt::Display for ParserError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Io(e) => e.fmt(f),
Self::Syntax(e) => e.fmt(f),
}
}
}
impl Error for ParserError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Io(e) => Some(e),
Self::Syntax(e) => Some(e),
}
}
}
#[allow(clippy::fallible_impl_from)]
impl From<TurtleError> for ParserError {
fn from(error: TurtleError) -> Self {
let error = io::Error::from(error);
if error.get_ref().map_or(false, |e| e.is::<TurtleError>()) {
Self::Syntax(SyntaxError {
inner: SyntaxErrorKind::Turtle(*error.into_inner().unwrap().downcast().unwrap()),
})
} else {
Self::Io(error)
}
}
}
#[allow(clippy::fallible_impl_from)]
impl From<RdfXmlError> for ParserError {
fn from(error: RdfXmlError) -> Self {
let error = io::Error::from(error);
if error.get_ref().map_or(false, |e| e.is::<RdfXmlError>()) {
Self::Syntax(SyntaxError {
inner: SyntaxErrorKind::RdfXml(*error.into_inner().unwrap().downcast().unwrap()),
})
} else {
Self::Io(error)
}
}
}
impl From<io::Error> for ParserError {
fn from(error: io::Error) -> Self {
Self::Io(error)
}
}
impl From<ParserError> for io::Error {
fn from(error: ParserError) -> Self {
match error {
ParserError::Io(error) => error,
ParserError::Syntax(error) => error.into(),
}
}
}
/// An error in the syntax of the parsed file
#[derive(Debug)]
pub struct SyntaxError {
inner: SyntaxErrorKind,
}
#[derive(Debug)]
enum SyntaxErrorKind {
Turtle(TurtleError),
RdfXml(RdfXmlError),
BaseIri { iri: String, error: IriParseError },
}
impl fmt::Display for SyntaxError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.inner {
SyntaxErrorKind::Turtle(e) => e.fmt(f),
SyntaxErrorKind::RdfXml(e) => e.fmt(f),
SyntaxErrorKind::BaseIri { iri, error } => {
write!(f, "Invalid base IRI '{}': {}", iri, error)
}
}
}
}
impl Error for SyntaxError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self.inner {
SyntaxErrorKind::Turtle(e) => Some(e),
SyntaxErrorKind::RdfXml(e) => Some(e),
SyntaxErrorKind::BaseIri { .. } => None,
}
}
}
impl From<SyntaxError> for io::Error {
fn from(error: SyntaxError) -> Self {
match error.inner {
SyntaxErrorKind::Turtle(error) => error.into(),
SyntaxErrorKind::RdfXml(error) => error.into(),
SyntaxErrorKind::BaseIri { iri, error } => Self::new(
io::ErrorKind::InvalidInput,
format!("Invalid IRI '{}': {}", iri, error),
),
}
}
}

@ -23,6 +23,7 @@
//!
//! See also [`Graph`](super::Graph) if you only care about plain triples.
use crate::io::read::ParserError;
use crate::io::{
DatasetFormat, DatasetParser, DatasetSerializer, GraphFormat, GraphParser, GraphSerializer,
};
@ -434,12 +435,12 @@ impl Dataset {
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> io::Result<()> {
) -> Result<(), ParserError> {
let mut parser = DatasetParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
.map_err(|e| ParserError::invalid_base_iri(base_iri, e))?;
}
for t in parser.read_quads(reader)? {
self.insert(&t?);
@ -1400,12 +1401,12 @@ impl<'a> GraphViewMut<'a> {
reader: impl BufRead,
format: GraphFormat,
base_iri: Option<&str>,
) -> io::Result<()> {
) -> Result<(), ParserError> {
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
.map_err(|e| ParserError::invalid_base_iri(base_iri, e))?;
}
for t in parser.read_triples(reader)? {
self.insert(&t?);

@ -19,6 +19,7 @@
//!
//! See also [`Dataset`](super::Dataset) if you want to get support of multiple RDF graphs at the same time.
use crate::io::read::ParserError;
use crate::io::GraphFormat;
use crate::model::dataset::*;
use crate::model::*;
@ -207,7 +208,7 @@ impl Graph {
reader: impl BufRead,
format: GraphFormat,
base_iri: Option<&str>,
) -> io::Result<()> {
) -> Result<(), ParserError> {
self.graph_mut().load(reader, format, base_iri)
}

@ -4,7 +4,7 @@ use crate::model::{
BlankNodeRef, GraphName, GraphNameRef, LiteralRef, NamedNodeRef, Quad, QuadRef, Subject,
SubjectRef, Term, TermRef,
};
use crate::store::Store;
use crate::store::{StorageError, Store};
use sophia_api::dataset::{
CollectibleDataset, DQuadSource, DResultTermSet, DTerm, Dataset, MdResult, MutableDataset,
};
@ -13,7 +13,6 @@ use sophia_api::quad::streaming_mode::{ByValue, StreamedQuad};
use sophia_api::term::{TTerm, TermKind};
use std::collections::HashSet;
use std::hash::Hash;
use std::io::Error;
use std::iter::empty;
type SophiaQuad = ([Term; 3], Option<Term>);
@ -21,10 +20,10 @@ type StreamedSophiaQuad<'a> = StreamedQuad<'a, ByValue<SophiaQuad>>;
impl Dataset for Store {
type Quad = ByValue<SophiaQuad>;
type Error = Error;
type Error = StorageError;
fn quads(&self) -> DQuadSource<'_, Self> {
Box::new(self.iter().map(io_quad_map))
Box::new(self.iter().map(quad_map))
}
fn quads_with_s<'s, TS>(&'s self, s: &'s TS) -> DQuadSource<'s, Self>
@ -36,7 +35,7 @@ impl Dataset for Store {
if s.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(s, None, None, None).map(io_quad_map))
Box::new(self.quads_for_pattern(s, None, None, None).map(quad_map))
}
}
fn quads_with_p<'s, TP>(&'s self, p: &'s TP) -> DQuadSource<'s, Self>
@ -48,7 +47,7 @@ impl Dataset for Store {
if p.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(None, p, None, None).map(io_quad_map))
Box::new(self.quads_for_pattern(None, p, None, None).map(quad_map))
}
}
fn quads_with_o<'s, TS>(&'s self, o: &'s TS) -> DQuadSource<'s, Self>
@ -60,7 +59,7 @@ impl Dataset for Store {
if o.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(None, None, o, None).map(io_quad_map))
Box::new(self.quads_for_pattern(None, None, o, None).map(quad_map))
}
}
fn quads_with_g<'s, TS>(&'s self, g: Option<&'s TS>) -> DQuadSource<'s, Self>
@ -72,7 +71,7 @@ impl Dataset for Store {
if g.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(None, None, None, g).map(io_quad_map))
Box::new(self.quads_for_pattern(None, None, None, g).map(quad_map))
}
}
fn quads_with_sp<'s, TS, TP>(&'s self, s: &'s TS, p: &'s TP) -> DQuadSource<'s, Self>
@ -87,7 +86,7 @@ impl Dataset for Store {
if s.is_none() || p.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(s, p, None, None).map(io_quad_map))
Box::new(self.quads_for_pattern(s, p, None, None).map(quad_map))
}
}
fn quads_with_so<'s, TS, TO>(&'s self, s: &'s TS, o: &'s TO) -> DQuadSource<'s, Self>
@ -102,7 +101,7 @@ impl Dataset for Store {
if s.is_none() || o.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(s, None, o, None).map(io_quad_map))
Box::new(self.quads_for_pattern(s, None, o, None).map(quad_map))
}
}
fn quads_with_sg<'s, TS, TG>(&'s self, s: &'s TS, g: Option<&'s TG>) -> DQuadSource<'s, Self>
@ -117,7 +116,7 @@ impl Dataset for Store {
if s.is_none() || g.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(s, None, None, g).map(io_quad_map))
Box::new(self.quads_for_pattern(s, None, None, g).map(quad_map))
}
}
fn quads_with_po<'s, TP, TO>(&'s self, p: &'s TP, o: &'s TO) -> DQuadSource<'s, Self>
@ -132,7 +131,7 @@ impl Dataset for Store {
if p.is_none() || o.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(None, p, o, None).map(io_quad_map))
Box::new(self.quads_for_pattern(None, p, o, None).map(quad_map))
}
}
fn quads_with_pg<'s, TP, TG>(&'s self, p: &'s TP, g: Option<&'s TG>) -> DQuadSource<'s, Self>
@ -147,7 +146,7 @@ impl Dataset for Store {
if p.is_none() || g.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(None, p, None, g).map(io_quad_map))
Box::new(self.quads_for_pattern(None, p, None, g).map(quad_map))
}
}
fn quads_with_og<'s, TO, TG>(&'s self, o: &'s TO, g: Option<&'s TG>) -> DQuadSource<'s, Self>
@ -162,7 +161,7 @@ impl Dataset for Store {
if o.is_none() || g.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(None, None, o, g).map(io_quad_map))
Box::new(self.quads_for_pattern(None, None, o, g).map(quad_map))
}
}
fn quads_with_spo<'s, TS, TP, TO>(
@ -185,7 +184,7 @@ impl Dataset for Store {
if s.is_none() || p.is_none() || o.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(s, p, o, None).map(io_quad_map))
Box::new(self.quads_for_pattern(s, p, o, None).map(quad_map))
}
}
fn quads_with_spg<'s, TS, TP, TG>(
@ -208,7 +207,7 @@ impl Dataset for Store {
if s.is_none() || p.is_none() || g.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(s, p, None, g).map(io_quad_map))
Box::new(self.quads_for_pattern(s, p, None, g).map(quad_map))
}
}
fn quads_with_sog<'s, TS, TO, TG>(
@ -231,7 +230,7 @@ impl Dataset for Store {
if s.is_none() || o.is_none() || g.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(s, None, o, g).map(io_quad_map))
Box::new(self.quads_for_pattern(s, None, o, g).map(quad_map))
}
}
fn quads_with_pog<'s, TP, TO, TG>(
@ -254,7 +253,7 @@ impl Dataset for Store {
if p.is_none() || o.is_none() || g.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(None, p, o, g).map(io_quad_map))
Box::new(self.quads_for_pattern(None, p, o, g).map(quad_map))
}
}
fn quads_with_spog<'s, TS, TP, TO, TG>(
@ -281,7 +280,7 @@ impl Dataset for Store {
if s.is_none() || p.is_none() || o.is_none() || g.is_none() {
Box::new(empty())
} else {
Box::new(self.quads_for_pattern(s, p, o, g).map(io_quad_map))
Box::new(self.quads_for_pattern(s, p, o, g).map(quad_map))
}
}
fn subjects(&self) -> DResultTermSet<Self>
@ -377,7 +376,7 @@ impl Dataset for Store {
}
impl MutableDataset for Store {
type MutationError = Error;
type MutationError = StorageError;
fn insert<TS, TP, TO, TG>(
&mut self,
s: &TS,
@ -438,7 +437,7 @@ impl CollectibleDataset for Store {
}
// helper functions
fn io_quad_map<'a>(res: Result<Quad, Error>) -> Result<StreamedSophiaQuad<'a>, Error> {
fn quad_map<'a>(res: Result<Quad, StorageError>) -> Result<StreamedSophiaQuad<'a>, StorageError> {
res.map(|q| {
let q: SophiaQuad = q.into();
StreamedQuad::by_value(q)

@ -2,11 +2,10 @@ use crate::model::TermRef;
use crate::sparql::algebra::QueryDataset;
use crate::sparql::EvaluationError;
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
use crate::storage::StorageReader;
use crate::storage::{StorageError, StorageReader};
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::convert::Infallible;
use std::iter::empty;
pub struct DatasetView {
@ -141,7 +140,7 @@ impl DatasetView {
pub fn encode_term<'a>(&self, term: impl Into<TermRef<'a>>) -> EncodedTerm {
let term = term.into();
let encoded = term.into();
insert_term::<Infallible, _>(term, &encoded, &mut |key, value| {
insert_term(term, &encoded, &mut |key, value| {
self.insert_str(key, value);
Ok(())
})
@ -159,9 +158,7 @@ impl DatasetView {
}
impl StrLookup for DatasetView {
type Error = EvaluationError;
fn get_str(&self, key: &StrHash) -> Result<Option<String>, EvaluationError> {
fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
Ok(if let Some(value) = self.extra.borrow().get(key) {
Some(value.clone())
} else {
@ -169,7 +166,7 @@ impl StrLookup for DatasetView {
})
}
fn contains_str(&self, key: &StrHash) -> Result<bool, EvaluationError> {
fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
Ok(self.extra.borrow().contains_key(key) || self.reader.contains_str(key)?)
}
}

@ -1,6 +1,6 @@
use crate::error::invalid_data_error;
use crate::io::read::ParserError;
use crate::sparql::ParseError;
use crate::storage::numeric_encoder::DecoderError;
use crate::storage::StorageError;
use std::convert::Infallible;
use std::error;
use std::fmt;
@ -10,14 +10,16 @@ use std::io;
#[derive(Debug)]
#[non_exhaustive]
pub enum EvaluationError {
/// An error in SPARQL query parsing
/// An error in SPARQL parsing
Parsing(ParseError),
/// An error from the storage
Storage(StorageError),
/// An error while parsing an external RDF file
ExternalParser(ParserError),
/// An error returned during store IOs or during results write
Io(io::Error),
/// An error returned during the query evaluation itself
Query(QueryError),
#[doc(hidden)]
Extra,
}
#[derive(Debug)]
@ -35,9 +37,10 @@ impl fmt::Display for EvaluationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Parsing(error) => error.fmt(f),
Self::Storage(error) => error.fmt(f),
Self::ExternalParser(error) => error.fmt(f),
Self::Io(error) => error.fmt(f),
Self::Query(error) => error.fmt(f),
Self::Extra => write!(f, "Unknown error"),
}
}
}
@ -55,9 +58,10 @@ impl error::Error for EvaluationError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
Self::Parsing(e) => Some(e),
Self::Storage(e) => Some(e),
Self::ExternalParser(e) => Some(e),
Self::Io(e) => Some(e),
Self::Query(e) => Some(e),
_ => None,
}
}
}
@ -99,17 +103,32 @@ impl From<ParseError> for EvaluationError {
}
}
impl From<StorageError> for EvaluationError {
fn from(error: StorageError) -> Self {
Self::Storage(error)
}
}
impl From<io::Error> for EvaluationError {
fn from(error: io::Error) -> Self {
Self::Io(error)
}
}
impl<E: Into<Self>> From<DecoderError<E>> for EvaluationError {
fn from(error: DecoderError<E>) -> Self {
impl From<ParserError> for EvaluationError {
fn from(error: ParserError) -> Self {
Self::ExternalParser(error)
}
}
impl From<EvaluationError> for io::Error {
fn from(error: EvaluationError) -> Self {
match error {
DecoderError::Store(error) => error.into(),
DecoderError::Decoder { msg } => invalid_data_error(msg).into(),
EvaluationError::Parsing(error) => Self::new(io::ErrorKind::InvalidData, error),
EvaluationError::ExternalParser(error) => error.into(),
EvaluationError::Io(error) => error,
EvaluationError::Storage(error) => error.into(),
EvaluationError::Query(error) => Self::new(io::ErrorKind::Other, error),
}
}
}

@ -23,7 +23,7 @@ use spargebra::term::{
};
use spargebra::GraphUpdateOperation;
use std::collections::HashMap;
use std::io::{BufReader, Error, ErrorKind};
use std::io::BufReader;
use std::rc::Rc;
pub fn evaluate_update(
@ -32,31 +32,16 @@ pub fn evaluate_update(
options: UpdateOptions,
) -> Result<(), EvaluationError> {
let base_iri = update.inner.base_iri.map(Rc::new);
storage
.transaction(move |transaction| {
let client = Client::new(options.query_options.http_timeout);
SimpleUpdateEvaluator {
transaction,
base_iri: base_iri.clone(),
options: options.clone(),
client,
}
.eval_all(&update.inner.operations, &update.using_datasets)
.map_err(|e| match e {
EvaluationError::Io(e) => e,
q => Error::new(ErrorKind::Other, q),
})
})
.map_err(|e| {
if e.get_ref()
.map_or(false, |inner| inner.is::<EvaluationError>())
{
*e.into_inner().unwrap().downcast().unwrap()
} else {
EvaluationError::Io(e)
}
})?;
Ok(())
storage.transaction(move |transaction| {
let client = Client::new(options.query_options.http_timeout);
SimpleUpdateEvaluator {
transaction,
base_iri: base_iri.clone(),
options: options.clone(),
client,
}
.eval_all(&update.inner.operations, &update.using_datasets)
})
}
struct SimpleUpdateEvaluator<'a> {

@ -5,6 +5,8 @@
#![allow(unsafe_code)]
use crate::error::invalid_input_error;
use crate::storage::error::StorageError;
use crate::store::CorruptionError;
use lazy_static::lazy_static;
use libc::{self, c_char, c_void, free};
use oxrocksdb_sys::*;
@ -12,10 +14,11 @@ use rand::random;
use std::borrow::Borrow;
use std::collections::HashMap;
use std::env::temp_dir;
use std::error::Error;
use std::ffi::{CStr, CString};
use std::fmt;
use std::fs::remove_dir_all;
use std::io::{Error, ErrorKind, Result};
use std::io;
use std::marker::PhantomData;
use std::ops::Deref;
use std::path::{Path, PathBuf};
@ -43,7 +46,11 @@ macro_rules! ffi_result_impl {
string: ptr::null()
};
let result = $($function)::*($($arg,)* &mut status);
convert_status(status).map(move |_| result)
if status.code == rocksdb_status_code_t_rocksdb_status_code_ok {
Ok(result)
} else {
Err(ErrorStatus(status))
}
}}
}
@ -123,7 +130,7 @@ impl Drop for DbHandler {
}
impl Db {
pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> {
pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self, StorageError> {
let path = if cfg!(target_os = "linux") {
"/dev/shm/".into()
} else {
@ -133,7 +140,10 @@ impl Db {
Ok(Self(Arc::new(Self::do_open(path, column_families, true)?)))
}
pub fn open(path: &Path, column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> {
pub fn open(
path: &Path,
column_families: Vec<ColumnFamilyDefinition>,
) -> Result<Self, StorageError> {
Ok(Self(Arc::new(Self::do_open(
path.to_owned(),
column_families,
@ -145,7 +155,7 @@ impl Db {
path: PathBuf,
mut column_families: Vec<ColumnFamilyDefinition>,
in_memory: bool,
) -> Result<DbHandler> {
) -> Result<DbHandler, StorageError> {
let c_path = path_to_cstring(&path)?;
unsafe {
@ -205,8 +215,8 @@ impl Db {
let c_column_families = column_family_names
.iter()
.map(|name| CString::new(*name))
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(invalid_input_error)?;
.collect::<Result<Vec<_>, _>>()
.map_err(|e| StorageError::Other(Box::new(e)))?;
let cf_options = column_families
.into_iter()
.map(|cf| {
@ -241,13 +251,10 @@ impl Db {
))?;
assert!(!db.is_null(), "rocksdb_create returned null");
for handle in &cf_handles {
if handle.is_null() {
rocksdb_transactiondb_close(db);
return Err(Error::new(
ErrorKind::Other,
"Received null column family handle from RocksDB.",
));
}
assert!(
!handle.is_null(),
"rocksdb_readoptions_create returned a null column family"
);
}
let read_options = rocksdb_readoptions_create();
@ -346,10 +353,10 @@ impl Db {
}
}
pub fn transaction<'a, 'b: 'a, T>(
pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self,
f: impl Fn(Transaction<'a>) -> Result<T>,
) -> Result<T> {
f: impl Fn(Transaction<'a>) -> Result<T, E>,
) -> Result<T, E> {
loop {
let transaction = unsafe {
let transaction = rocksdb_transaction_begin(
@ -380,7 +387,8 @@ impl Db {
match result {
Ok(result) => {
unsafe {
ffi_result!(rocksdb_transaction_commit_with_status(transaction))?;
ffi_result!(rocksdb_transaction_commit_with_status(transaction))
.map_err(StorageError::from)?;
rocksdb_transaction_destroy(transaction);
rocksdb_readoptions_destroy(read_options);
}
@ -388,22 +396,22 @@ impl Db {
}
Err(e) => {
unsafe {
ffi_result!(rocksdb_transaction_rollback_with_status(transaction))?;
ffi_result!(rocksdb_transaction_rollback_with_status(transaction))
.map_err(StorageError::from)?;
rocksdb_transaction_destroy(transaction);
rocksdb_readoptions_destroy(read_options);
}
let is_conflict_error = e.get_ref().map_or(false, |error| {
let mut error: &dyn std::error::Error = error;
// We use the narroest source: as a chance to be a RocksDB status
while let Some(e) = error.source() {
error = e;
}
// We look for the root error
let mut error: &(dyn Error + 'static) = &e;
while let Some(e) = error.source() {
error = e;
}
let is_conflict_error =
error.downcast_ref::<ErrorStatus>().map_or(false, |e| {
e.0.code == rocksdb_status_code_t_rocksdb_status_code_busy
|| e.0.code == rocksdb_status_code_t_rocksdb_status_code_timed_out
|| e.0.code == rocksdb_status_code_t_rocksdb_status_code_try_again
})
});
});
if is_conflict_error {
// We give a chance to the OS to do something else before retrying in order to help avoiding an other conflict
yield_now();
@ -416,18 +424,19 @@ impl Db {
}
}
pub fn flush(&self, column_family: &ColumnFamily) -> Result<()> {
pub fn flush(&self, column_family: &ColumnFamily) -> Result<(), StorageError> {
unsafe {
ffi_result!(rocksdb_transactiondb_flush_cf_with_status(
self.0.db,
self.0.flush_options,
column_family.0,
))
))?;
}
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
pub fn compact(&self, column_family: &ColumnFamily) -> Result<()> {
pub fn compact(&self, column_family: &ColumnFamily) -> Result<(), StorageError> {
unsafe {
ffi_result!(rocksdb_transactiondb_compact_range_cf_opt_with_status(
self.0.db,
@ -437,11 +446,12 @@ impl Db {
0,
ptr::null(),
0,
))
))?;
}
Ok(())
}
pub fn new_sst_file(&self) -> Result<SstFileWriter> {
pub fn new_sst_file(&self) -> Result<SstFileWriter, StorageError> {
unsafe {
let path = self.0.path.join(random::<u128>().to_string());
let writer = rocksdb_sstfilewriter_create(self.0.env_options, self.0.options);
@ -453,7 +463,10 @@ impl Db {
}
}
pub fn insert_stt_files(&self, ssts_for_cf: &[(&ColumnFamily, PathBuf)]) -> Result<()> {
pub fn insert_stt_files(
&self,
ssts_for_cf: &[(&ColumnFamily, PathBuf)],
) -> Result<(), StorageError> {
let mut paths_by_cf = HashMap::<_, Vec<_>>::new();
for (cf, path) in ssts_for_cf {
paths_by_cf
@ -479,8 +492,9 @@ impl Db {
self.0.db,
args.as_ptr(),
args.len()
))
))?;
}
Ok(())
}
}
@ -530,7 +544,11 @@ impl Drop for Reader {
}
impl Reader {
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<PinnableSlice>> {
pub fn get(
&self,
column_family: &ColumnFamily,
key: &[u8],
) -> Result<Option<PinnableSlice>, StorageError> {
unsafe {
let slice = match &self.inner {
InnerReader::Snapshot(inner) => {
@ -552,7 +570,9 @@ impl Reader {
key.len()
))
} else {
return Err(invalid_input_error("The transaction is already ended"));
return Err(StorageError::Other(
"The transaction is already ended".into(),
));
}
}
}?;
@ -564,15 +584,23 @@ impl Reader {
}
}
pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> {
pub fn contains_key(
&self,
column_family: &ColumnFamily,
key: &[u8],
) -> Result<bool, StorageError> {
Ok(self.get(column_family, key)?.is_some()) //TODO: optimize
}
pub fn iter(&self, column_family: &ColumnFamily) -> Result<Iter> {
pub fn iter(&self, column_family: &ColumnFamily) -> Result<Iter, StorageError> {
self.scan_prefix(column_family, &[])
}
pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Result<Iter> {
pub fn scan_prefix(
&self,
column_family: &ColumnFamily,
prefix: &[u8],
) -> Result<Iter, StorageError> {
//We generate the upper bound
let upper_bound = {
let mut bound = prefix.to_vec();
@ -612,7 +640,9 @@ impl Reader {
if let Some(inner) = inner.upgrade() {
rocksdb_transaction_create_iterator_cf(*inner, options, column_family.0)
} else {
return Err(invalid_input_error("The transaction is already ended"));
return Err(StorageError::Other(
"The transaction is already ended".into(),
));
}
}
};
@ -633,7 +663,7 @@ impl Reader {
}
}
pub fn len(&self, column_family: &ColumnFamily) -> Result<usize> {
pub fn len(&self, column_family: &ColumnFamily) -> Result<usize, StorageError> {
let mut count = 0;
let mut iter = self.iter(column_family)?;
while iter.is_valid() {
@ -644,7 +674,7 @@ impl Reader {
Ok(count)
}
pub fn is_empty(&self, column_family: &ColumnFamily) -> Result<bool> {
pub fn is_empty(&self, column_family: &ColumnFamily) -> Result<bool, StorageError> {
let iter = self.iter(column_family)?;
iter.status()?; // We makes sure there is no read problem
Ok(!iter.is_valid())
@ -669,7 +699,7 @@ impl Transaction<'_> {
&self,
column_family: &ColumnFamily,
key: &[u8],
) -> Result<bool> {
) -> Result<bool, StorageError> {
unsafe {
let slice = ffi_result!(rocksdb_transaction_get_for_update_pinned_cf_with_status(
*self.transaction,
@ -682,7 +712,12 @@ impl Transaction<'_> {
}
}
pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
pub fn insert(
&mut self,
column_family: &ColumnFamily,
key: &[u8],
value: &[u8],
) -> Result<(), StorageError> {
unsafe {
ffi_result!(rocksdb_transaction_put_cf_with_status(
*self.transaction,
@ -691,23 +726,29 @@ impl Transaction<'_> {
key.len(),
value.as_ptr() as *const c_char,
value.len(),
))
))?;
}
Ok(())
}
pub fn insert_empty(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
pub fn insert_empty(
&mut self,
column_family: &ColumnFamily,
key: &[u8],
) -> Result<(), StorageError> {
self.insert(column_family, key, &[])
}
pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<(), StorageError> {
unsafe {
ffi_result!(rocksdb_transaction_delete_cf_with_status(
*self.transaction,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
))
))?;
}
Ok(())
}
}
@ -815,8 +856,11 @@ impl Iter {
self.is_currently_valid
}
pub fn status(&self) -> Result<()> {
unsafe { ffi_result!(rocksdb_iter_get_status(self.iter)) }
pub fn status(&self) -> Result<(), StorageError> {
unsafe {
ffi_result!(rocksdb_iter_get_status(self.iter))?;
}
Ok(())
}
pub fn next(&mut self) {
@ -853,7 +897,7 @@ impl Drop for SstFileWriter {
}
impl SstFileWriter {
pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<(), StorageError> {
unsafe {
ffi_result!(rocksdb_sstfilewriter_put_with_status(
self.writer,
@ -861,15 +905,16 @@ impl SstFileWriter {
key.len(),
value.as_ptr() as *const c_char,
value.len(),
))
))?;
}
Ok(())
}
pub fn insert_empty(&mut self, key: &[u8]) -> Result<()> {
pub fn insert_empty(&mut self, key: &[u8]) -> Result<(), StorageError> {
self.insert(key, &[])
}
pub fn finish(self) -> Result<PathBuf> {
pub fn finish(self) -> Result<PathBuf, StorageError> {
unsafe {
ffi_result!(rocksdb_sstfilewriter_finish_with_status(self.writer))?;
}
@ -877,38 +922,6 @@ impl SstFileWriter {
}
}
fn convert_status(status: rocksdb_status_t) -> Result<()> {
let kind = if status.code == rocksdb_status_code_t_rocksdb_status_code_ok {
return Ok(()); // No error
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_not_supported {
ErrorKind::Unsupported
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_invalid_argument {
ErrorKind::InvalidInput
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_timed_out {
ErrorKind::TimedOut
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_busy {
ErrorKind::Other // TODO ErrorKind::ResourceBusy
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_expired {
ErrorKind::TimedOut
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_io_error
&& status.subcode == rocksdb_status_subcode_t_rocksdb_status_subcode_no_space
{
ErrorKind::Other // TODO ErrorKind::StorageFull
} else if status.code == rocksdb_status_code_t_rocksdb_status_code_aborted
&& status.subcode == rocksdb_status_subcode_t_rocksdb_status_subcode_memory_limit
{
ErrorKind::OutOfMemory
} else if (status.code == rocksdb_status_code_t_rocksdb_status_code_not_found
|| status.code == rocksdb_status_code_t_rocksdb_status_code_io_error)
&& status.subcode == rocksdb_status_subcode_t_rocksdb_status_subcode_path_not_found
{
ErrorKind::NotFound
} else {
ErrorKind::Other
};
Err(Error::new(kind, ErrorStatus(status)))
}
#[derive(Debug)]
struct ErrorStatus(rocksdb_status_t);
@ -933,17 +946,39 @@ impl fmt::Display for ErrorStatus {
}
}
impl std::error::Error for ErrorStatus {}
impl Error for ErrorStatus {}
impl From<ErrorStatus> for StorageError {
fn from(status: ErrorStatus) -> Self {
if status.0.code == rocksdb_status_code_t_rocksdb_status_code_io_error {
let kind =
if status.0.subcode == rocksdb_status_subcode_t_rocksdb_status_subcode_no_space {
io::ErrorKind::Other // TODO ErrorKind::StorageFull
} else if status.0.subcode
== rocksdb_status_subcode_t_rocksdb_status_subcode_path_not_found
{
io::ErrorKind::NotFound
} else {
io::ErrorKind::Other
};
Self::Io(io::Error::new(kind, status))
} else if status.0.code == rocksdb_status_code_t_rocksdb_status_code_corruption {
Self::Corruption(CorruptionError::new(status))
} else {
Self::Other(Box::new(status))
}
}
}
struct UnsafeEnv(*mut rocksdb_env_t);
// Hack for lazy_static. OK because only written in lazy static and used in a thread-safe way by RocksDB
unsafe impl Sync for UnsafeEnv {}
fn path_to_cstring(path: &Path) -> Result<CString> {
CString::new(
fn path_to_cstring(path: &Path) -> Result<CString, StorageError> {
Ok(CString::new(
path.to_str()
.ok_or_else(|| invalid_input_error("The DB path is not valid UTF-8"))?,
)
.map_err(invalid_input_error)
.map_err(|e| invalid_input_error(format!("The DB path contains null bytes: {}", e)))?)
}

@ -1,8 +1,8 @@
use crate::error::invalid_data_error;
use crate::model::xsd::*;
use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, EncodedTriple, StrHash};
use crate::storage::small_string::SmallString;
use std::io;
use crate::storage::StorageError;
use crate::store::CorruptionError;
use std::io::{Cursor, Read};
use std::mem::size_of;
use std::rc::Rc;
@ -64,7 +64,7 @@ pub enum QuadEncoding {
}
impl QuadEncoding {
pub fn decode(self, buffer: &[u8]) -> io::Result<EncodedQuad> {
pub fn decode(self, buffer: &[u8]) -> Result<EncodedQuad, StorageError> {
let mut cursor = Cursor::new(&buffer);
match self {
QuadEncoding::Spog => cursor.read_spog_quad(),
@ -80,14 +80,14 @@ impl QuadEncoding {
}
}
pub fn decode_term(buffer: &[u8]) -> io::Result<EncodedTerm> {
pub fn decode_term(buffer: &[u8]) -> Result<EncodedTerm, StorageError> {
Cursor::new(&buffer).read_term()
}
pub trait TermReader {
fn read_term(&mut self) -> io::Result<EncodedTerm>;
fn read_term(&mut self) -> Result<EncodedTerm, StorageError>;
fn read_spog_quad(&mut self) -> io::Result<EncodedQuad> {
fn read_spog_quad(&mut self) -> Result<EncodedQuad, StorageError> {
let subject = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
@ -100,7 +100,7 @@ pub trait TermReader {
})
}
fn read_posg_quad(&mut self) -> io::Result<EncodedQuad> {
fn read_posg_quad(&mut self) -> Result<EncodedQuad, StorageError> {
let predicate = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
@ -113,7 +113,7 @@ pub trait TermReader {
})
}
fn read_ospg_quad(&mut self) -> io::Result<EncodedQuad> {
fn read_ospg_quad(&mut self) -> Result<EncodedQuad, StorageError> {
let object = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
@ -126,7 +126,7 @@ pub trait TermReader {
})
}
fn read_gspo_quad(&mut self) -> io::Result<EncodedQuad> {
fn read_gspo_quad(&mut self) -> Result<EncodedQuad, StorageError> {
let graph_name = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
@ -139,7 +139,7 @@ pub trait TermReader {
})
}
fn read_gpos_quad(&mut self) -> io::Result<EncodedQuad> {
fn read_gpos_quad(&mut self) -> Result<EncodedQuad, StorageError> {
let graph_name = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
@ -152,7 +152,7 @@ pub trait TermReader {
})
}
fn read_gosp_quad(&mut self) -> io::Result<EncodedQuad> {
fn read_gosp_quad(&mut self) -> Result<EncodedQuad, StorageError> {
let graph_name = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
@ -165,7 +165,7 @@ pub trait TermReader {
})
}
fn read_dspo_quad(&mut self) -> io::Result<EncodedQuad> {
fn read_dspo_quad(&mut self) -> Result<EncodedQuad, StorageError> {
let subject = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
@ -177,7 +177,7 @@ pub trait TermReader {
})
}
fn read_dpos_quad(&mut self) -> io::Result<EncodedQuad> {
fn read_dpos_quad(&mut self) -> Result<EncodedQuad, StorageError> {
let predicate = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
@ -189,7 +189,7 @@ pub trait TermReader {
})
}
fn read_dosp_quad(&mut self) -> io::Result<EncodedQuad> {
fn read_dosp_quad(&mut self) -> Result<EncodedQuad, StorageError> {
let object = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
@ -203,7 +203,7 @@ pub trait TermReader {
}
impl<R: Read> TermReader for R {
fn read_term(&mut self) -> io::Result<EncodedTerm> {
fn read_term(&mut self) -> Result<EncodedTerm, StorageError> {
let mut type_buffer = [0];
self.read_exact(&mut type_buffer)?;
match type_buffer[0] {
@ -225,7 +225,7 @@ impl<R: Read> TermReader for R {
let mut buffer = [0; 16];
self.read_exact(&mut buffer)?;
Ok(EncodedTerm::SmallBlankNode(
SmallString::from_be_bytes(buffer).map_err(invalid_data_error)?,
SmallString::from_be_bytes(buffer).map_err(CorruptionError::new)?,
))
}
TYPE_BIG_BLANK_NODE_ID => {
@ -241,9 +241,10 @@ impl<R: Read> TermReader for R {
let mut value_buffer = [0; 16];
self.read_exact(&mut value_buffer)?;
Ok(EncodedTerm::SmallSmallLangStringLiteral {
value: SmallString::from_be_bytes(value_buffer).map_err(invalid_data_error)?,
value: SmallString::from_be_bytes(value_buffer)
.map_err(CorruptionError::new)?,
language: SmallString::from_be_bytes(language_buffer)
.map_err(invalid_data_error)?,
.map_err(CorruptionError::new)?,
})
}
TYPE_SMALL_BIG_LANG_STRING_LITERAL => {
@ -252,7 +253,8 @@ impl<R: Read> TermReader for R {
let mut value_buffer = [0; 16];
self.read_exact(&mut value_buffer)?;
Ok(EncodedTerm::SmallBigLangStringLiteral {
value: SmallString::from_be_bytes(value_buffer).map_err(invalid_data_error)?,
value: SmallString::from_be_bytes(value_buffer)
.map_err(CorruptionError::new)?,
language_id: StrHash::from_be_bytes(language_buffer),
})
}
@ -264,7 +266,7 @@ impl<R: Read> TermReader for R {
Ok(EncodedTerm::BigSmallLangStringLiteral {
value_id: StrHash::from_be_bytes(value_buffer),
language: SmallString::from_be_bytes(language_buffer)
.map_err(invalid_data_error)?,
.map_err(CorruptionError::new)?,
})
}
TYPE_BIG_BIG_LANG_STRING_LITERAL => {
@ -284,7 +286,8 @@ impl<R: Read> TermReader for R {
self.read_exact(&mut value_buffer)?;
Ok(EncodedTerm::SmallTypedLiteral {
datatype_id: StrHash::from_be_bytes(datatype_buffer),
value: SmallString::from_be_bytes(value_buffer).map_err(invalid_data_error)?,
value: SmallString::from_be_bytes(value_buffer)
.map_err(CorruptionError::new)?,
})
}
TYPE_BIG_TYPED_LITERAL => {
@ -301,7 +304,7 @@ impl<R: Read> TermReader for R {
let mut buffer = [0; 16];
self.read_exact(&mut buffer)?;
Ok(EncodedTerm::SmallStringLiteral(
SmallString::from_be_bytes(buffer).map_err(invalid_data_error)?,
SmallString::from_be_bytes(buffer).map_err(CorruptionError::new)?,
))
}
TYPE_BIG_STRING_LITERAL => {
@ -405,7 +408,7 @@ impl<R: Read> TermReader for R {
predicate: self.read_term()?,
object: self.read_term()?,
}))),
_ => Err(invalid_data_error("the term buffer has an invalid type id")),
_ => Err(CorruptionError::msg("the term buffer has an invalid type id").into()),
}
}
}
@ -646,7 +649,6 @@ mod tests {
use crate::storage::numeric_encoder::*;
use std::cell::RefCell;
use std::collections::HashMap;
use std::convert::Infallible;
#[derive(Default)]
struct MemoryStrStore {
@ -654,13 +656,11 @@ mod tests {
}
impl StrLookup for MemoryStrStore {
type Error = Infallible;
fn get_str(&self, key: &StrHash) -> Result<Option<String>, Infallible> {
fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
Ok(self.id2str.borrow().get(key).cloned())
}
fn contains_str(&self, key: &StrHash) -> Result<bool, Infallible> {
fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
Ok(self.id2str.borrow().contains_key(key))
}
}
@ -669,8 +669,7 @@ mod tests {
fn insert_term(&self, term: TermRef<'_>, encoded: &EncodedTerm) {
insert_term(term, encoded, &mut |h, v| {
self.insert_str(h, v);
let r: Result<(), Infallible> = Ok(());
r
Ok(())
})
.unwrap();
}

@ -0,0 +1,206 @@
use crate::io::read::ParserError;
use std::error::Error;
use std::fmt;
use std::io;
/// An error related to storage operations (reads, writes...).
#[derive(Debug)]
#[non_exhaustive]
pub enum StorageError {
/// Error from the OS I/O layer.
Io(io::Error),
/// Error related to data corruption.
Corruption(CorruptionError),
#[doc(hidden)]
Other(Box<dyn Error + Send + Sync + 'static>),
}
impl fmt::Display for StorageError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Io(e) => e.fmt(f),
Self::Corruption(e) => e.fmt(f),
Self::Other(e) => e.fmt(f),
}
}
}
impl Error for StorageError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Io(e) => Some(e),
Self::Corruption(e) => Some(e),
Self::Other(e) => Some(e.as_ref()),
}
}
}
impl From<io::Error> for StorageError {
fn from(error: io::Error) -> Self {
Self::Io(error)
}
}
impl From<StorageError> for io::Error {
fn from(error: StorageError) -> Self {
match error {
StorageError::Io(error) => error,
StorageError::Corruption(error) => error.into(),
StorageError::Other(error) => Self::new(io::ErrorKind::Other, error),
}
}
}
/// An error return if some content in the database is corrupted.
#[derive(Debug)]
pub struct CorruptionError {
inner: CorruptionErrorKind,
}
#[derive(Debug)]
enum CorruptionErrorKind {
Msg(String),
Other(Box<dyn Error + Send + Sync + 'static>),
}
impl CorruptionError {
/// Builds an error from a printable error message.
pub(crate) fn new(error: impl Into<Box<dyn Error + Send + Sync + 'static>>) -> Self {
Self {
inner: CorruptionErrorKind::Other(error.into()),
}
}
/// Builds an error from a printable error message.
pub(crate) fn msg(msg: impl Into<String>) -> Self {
Self {
inner: CorruptionErrorKind::Msg(msg.into()),
}
}
}
impl fmt::Display for CorruptionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.inner {
CorruptionErrorKind::Msg(e) => e.fmt(f),
CorruptionErrorKind::Other(e) => e.fmt(f),
}
}
}
impl Error for CorruptionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match &self.inner {
CorruptionErrorKind::Msg(_) => None,
CorruptionErrorKind::Other(e) => Some(e.as_ref()),
}
}
}
impl From<CorruptionError> for StorageError {
fn from(error: CorruptionError) -> Self {
Self::Corruption(error)
}
}
impl From<CorruptionError> for io::Error {
fn from(error: CorruptionError) -> Self {
Self::new(io::ErrorKind::InvalidData, error)
}
}
/// An error raised while loading a file into a [`Store`](crate::store::Store).
#[derive(Debug)]
pub enum LoaderError {
/// An error raised while reading the file.
Parser(ParserError),
/// An error raised during the insertion in the store.
Storage(StorageError),
}
impl fmt::Display for LoaderError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Parser(e) => e.fmt(f),
Self::Storage(e) => e.fmt(f),
}
}
}
impl Error for LoaderError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Parser(e) => Some(e),
Self::Storage(e) => Some(e),
}
}
}
impl From<ParserError> for LoaderError {
fn from(error: ParserError) -> Self {
Self::Parser(error)
}
}
impl From<StorageError> for LoaderError {
fn from(error: StorageError) -> Self {
Self::Storage(error)
}
}
impl From<LoaderError> for io::Error {
fn from(error: LoaderError) -> Self {
match error {
LoaderError::Storage(error) => error.into(),
LoaderError::Parser(error) => error.into(),
}
}
}
/// An error raised while writing a file from a [`Store`](crate::store::Store).
#[derive(Debug)]
pub enum SerializerError {
/// An error raised while writing the content.
Io(io::Error),
/// An error raised during the lookup in the store.
Storage(StorageError),
}
impl fmt::Display for SerializerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Io(e) => e.fmt(f),
Self::Storage(e) => e.fmt(f),
}
}
}
impl Error for SerializerError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Io(e) => Some(e),
Self::Storage(e) => Some(e),
}
}
}
impl From<io::Error> for SerializerError {
fn from(error: io::Error) -> Self {
Self::Io(error)
}
}
impl From<StorageError> for SerializerError {
fn from(error: StorageError) -> Self {
Self::Storage(error)
}
}
impl From<SerializerError> for io::Error {
fn from(error: SerializerError) -> Self {
match error {
SerializerError::Storage(error) => error.into(),
SerializerError::Io(error) => error,
}
}
}

@ -1,4 +1,3 @@
use crate::error::invalid_data_error;
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, Quad, QuadRef, TermRef};
use crate::storage::backend::{Reader, Transaction};
#[cfg(not(target_arch = "wasm32"))]
@ -9,11 +8,12 @@ use crate::storage::binary_encoder::{
write_pos_quad, write_posg_quad, write_spo_quad, write_spog_quad, write_term, QuadEncoding,
WRITTEN_TERM_MAX_SIZE,
};
pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, StorageError};
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
#[cfg(not(target_arch = "wasm32"))]
use std::collections::{HashMap, HashSet};
use std::io::Result;
use std::error::Error;
#[cfg(not(target_arch = "wasm32"))]
use std::mem::take;
#[cfg(not(target_arch = "wasm32"))]
@ -23,6 +23,7 @@ use std::thread::spawn;
mod backend;
mod binary_encoder;
mod error;
pub mod numeric_encoder;
pub mod small_string;
@ -60,12 +61,12 @@ pub struct Storage {
}
impl Storage {
pub fn new() -> Result<Self> {
pub fn new() -> Result<Self, StorageError> {
Self::setup(Db::new(Self::column_families())?)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn open(path: &Path) -> Result<Self> {
pub fn open(path: &Path) -> Result<Self, StorageError> {
Self::setup(Db::open(path, Self::column_families())?)
}
@ -129,7 +130,7 @@ impl Storage {
]
}
fn setup(db: Db) -> Result<Self> {
fn setup(db: Db) -> Result<Self, StorageError> {
let this = Self {
default_cf: db.column_family(DEFAULT_CF).unwrap(),
id2str_cf: db.column_family(ID2STR_CF).unwrap(),
@ -151,7 +152,7 @@ impl Storage {
}
#[cfg(not(target_arch = "wasm32"))]
fn migrate(&self) -> Result<()> {
fn migrate(&self) -> Result<(), StorageError> {
let mut version = self.ensure_version()?;
if version == 0 {
// We migrate to v1
@ -178,20 +179,20 @@ impl Storage {
}
match version {
_ if version < LATEST_STORAGE_VERSION => Err(invalid_data_error(format!(
_ if version < LATEST_STORAGE_VERSION => Err(CorruptionError::msg(format!(
"The RocksDB database is using the outdated encoding version {}. Automated migration is not supported, please dump the store dataset using a compatible Oxigraph version and load it again using the current version",
version
))),
)).into()),
LATEST_STORAGE_VERSION => Ok(()),
_ => Err(invalid_data_error(format!(
_ => Err(CorruptionError::msg(format!(
"The RocksDB database is using the too recent version {}. Upgrade to the latest Oxigraph version to load this database",
version
)))
)).into())
}
}
#[cfg(not(target_arch = "wasm32"))]
fn ensure_version(&self) -> Result<u64> {
fn ensure_version(&self) -> Result<u64, StorageError> {
Ok(
if let Some(version) = self.db.snapshot().get(&self.default_cf, b"oxversion")? {
let mut buffer = [0; 8];
@ -205,7 +206,7 @@ impl Storage {
}
#[cfg(not(target_arch = "wasm32"))]
fn update_version(&self, version: u64) -> Result<()> {
fn update_version(&self, version: u64) -> Result<(), StorageError> {
self.db.transaction(|mut t| {
t.insert(&self.default_cf, b"oxversion", &version.to_be_bytes())
})?;
@ -219,10 +220,10 @@ impl Storage {
}
}
pub fn transaction<'a, 'b: 'a, T>(
pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
&'b self,
f: impl Fn(StorageWriter<'a>) -> Result<T>,
) -> Result<T> {
f: impl Fn(StorageWriter<'a>) -> Result<T, E>,
) -> Result<T, E> {
self.db.transaction(|transaction| {
f(StorageWriter {
buffer: Vec::new(),
@ -233,7 +234,7 @@ impl Storage {
}
#[cfg(not(target_arch = "wasm32"))]
pub fn flush(&self) -> Result<()> {
pub fn flush(&self) -> Result<(), StorageError> {
self.db.flush(&self.default_cf)?;
self.db.flush(&self.gpos_cf)?;
self.db.flush(&self.gpos_cf)?;
@ -248,7 +249,7 @@ impl Storage {
}
#[cfg(not(target_arch = "wasm32"))]
pub fn compact(&self) -> Result<()> {
pub fn compact(&self) -> Result<(), StorageError> {
self.db.compact(&self.default_cf)?;
self.db.compact(&self.gpos_cf)?;
self.db.compact(&self.gpos_cf)?;
@ -269,16 +270,16 @@ pub struct StorageReader {
}
impl StorageReader {
pub fn len(&self) -> Result<usize> {
pub fn len(&self) -> Result<usize, StorageError> {
Ok(self.reader.len(&self.storage.gspo_cf)? + self.reader.len(&self.storage.dspo_cf)?)
}
pub fn is_empty(&self) -> Result<bool> {
pub fn is_empty(&self) -> Result<bool, StorageError> {
Ok(self.reader.is_empty(&self.storage.gspo_cf)?
&& self.reader.is_empty(&self.storage.dspo_cf)?)
}
pub fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
pub fn contains(&self, quad: &EncodedQuad) -> Result<bool, StorageError> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad);
@ -529,7 +530,7 @@ impl StorageReader {
}
}
pub fn contains_named_graph(&self, graph_name: &EncodedTerm) -> Result<bool> {
pub fn contains_named_graph(&self, graph_name: &EncodedTerm) -> Result<bool, StorageError> {
self.reader
.contains_key(&self.storage.graphs_cf, &encode_term(graph_name))
}
@ -582,15 +583,16 @@ impl StorageReader {
}
}
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>> {
self.reader
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
Ok(self
.reader
.get(&self.storage.id2str_cf, &key.to_be_bytes())?
.map(|v| String::from_utf8(v.into()))
.transpose()
.map_err(invalid_data_error)
.map_err(CorruptionError::new)?)
}
pub fn contains_str(&self, key: &StrHash) -> Result<bool> {
pub fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
self.reader
.contains_key(&self.storage.id2str_cf, &key.to_be_bytes())
}
@ -618,9 +620,9 @@ impl ChainedDecodingQuadIterator {
}
impl Iterator for ChainedDecodingQuadIterator {
type Item = Result<EncodedQuad>;
type Item = Result<EncodedQuad, StorageError>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
fn next(&mut self) -> Option<Result<EncodedQuad, StorageError>> {
if let Some(result) = self.first.next() {
Some(result)
} else if let Some(second) = self.second.as_mut() {
@ -637,9 +639,9 @@ pub struct DecodingQuadIterator {
}
impl Iterator for DecodingQuadIterator {
type Item = Result<EncodedQuad>;
type Item = Result<EncodedQuad, StorageError>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
fn next(&mut self) -> Option<Result<EncodedQuad, StorageError>> {
if let Err(e) = self.iter.status() {
return Some(Err(e));
}
@ -654,9 +656,9 @@ pub struct DecodingGraphIterator {
}
impl Iterator for DecodingGraphIterator {
type Item = Result<EncodedTerm>;
type Item = Result<EncodedTerm, StorageError>;
fn next(&mut self) -> Option<Result<EncodedTerm>> {
fn next(&mut self) -> Option<Result<EncodedTerm, StorageError>> {
if let Err(e) = self.iter.status() {
return Some(Err(e));
}
@ -667,13 +669,11 @@ impl Iterator for DecodingGraphIterator {
}
impl StrLookup for StorageReader {
type Error = std::io::Error;
fn get_str(&self, key: &StrHash) -> Result<Option<String>> {
fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
self.get_str(key)
}
fn contains_str(&self, key: &StrHash) -> Result<bool> {
fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
self.contains_str(key)
}
}
@ -692,7 +692,7 @@ impl<'a> StorageWriter<'a> {
}
}
pub fn insert(&mut self, quad: QuadRef<'_>) -> Result<bool> {
pub fn insert(&mut self, quad: QuadRef<'_>) -> Result<bool, StorageError> {
let encoded = quad.into();
self.buffer.clear();
let result = if quad.graph_name.is_default_graph() {
@ -777,7 +777,10 @@ impl<'a> StorageWriter<'a> {
Ok(result)
}
pub fn insert_named_graph(&mut self, graph_name: NamedOrBlankNodeRef<'_>) -> Result<bool> {
pub fn insert_named_graph(
&mut self,
graph_name: NamedOrBlankNodeRef<'_>,
) -> Result<bool, StorageError> {
let encoded_graph_name = graph_name.into();
self.buffer.clear();
@ -796,7 +799,11 @@ impl<'a> StorageWriter<'a> {
Ok(result)
}
fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<()> {
fn insert_term(
&mut self,
term: TermRef<'_>,
encoded: &EncodedTerm,
) -> Result<(), StorageError> {
insert_term(term, encoded, &mut |key, value| self.insert_str(key, value))
}
@ -804,7 +811,7 @@ impl<'a> StorageWriter<'a> {
&mut self,
graph_name: GraphNameRef<'_>,
encoded: &EncodedTerm,
) -> Result<()> {
) -> Result<(), StorageError> {
match graph_name {
GraphNameRef::NamedNode(graph_name) => self.insert_term(graph_name.into(), encoded),
GraphNameRef::BlankNode(graph_name) => self.insert_term(graph_name.into(), encoded),
@ -812,7 +819,7 @@ impl<'a> StorageWriter<'a> {
}
}
fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<()> {
fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> {
self.transaction.insert(
&self.storage.id2str_cf,
&key.to_be_bytes(),
@ -820,11 +827,11 @@ impl<'a> StorageWriter<'a> {
)
}
pub fn remove(&mut self, quad: QuadRef<'_>) -> Result<bool> {
pub fn remove(&mut self, quad: QuadRef<'_>) -> Result<bool, StorageError> {
self.remove_encoded(&quad.into())
}
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<bool> {
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<bool, StorageError> {
self.buffer.clear();
let result = if quad.graph_name.is_default_graph() {
write_spo_quad(&mut self.buffer, quad);
@ -891,7 +898,7 @@ impl<'a> StorageWriter<'a> {
Ok(result)
}
pub fn clear_graph(&mut self, graph_name: GraphNameRef<'_>) -> Result<()> {
pub fn clear_graph(&mut self, graph_name: GraphNameRef<'_>) -> Result<(), StorageError> {
if graph_name.is_default_graph() {
for quad in self.reader().quads_for_graph(&EncodedTerm::DefaultGraph) {
self.remove_encoded(&quad?)?;
@ -912,25 +919,31 @@ impl<'a> StorageWriter<'a> {
Ok(())
}
pub fn clear_all_named_graphs(&mut self) -> Result<()> {
pub fn clear_all_named_graphs(&mut self) -> Result<(), StorageError> {
for quad in self.reader().quads_in_named_graph() {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn clear_all_graphs(&mut self) -> Result<()> {
pub fn clear_all_graphs(&mut self) -> Result<(), StorageError> {
for quad in self.reader().quads() {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn remove_named_graph(&mut self, graph_name: NamedOrBlankNodeRef<'_>) -> Result<bool> {
pub fn remove_named_graph(
&mut self,
graph_name: NamedOrBlankNodeRef<'_>,
) -> Result<bool, StorageError> {
self.remove_encoded_named_graph(&graph_name.into())
}
fn remove_encoded_named_graph(&mut self, graph_name: &EncodedTerm) -> Result<bool> {
fn remove_encoded_named_graph(
&mut self,
graph_name: &EncodedTerm,
) -> Result<bool, StorageError> {
self.buffer.clear();
write_term(&mut self.buffer, graph_name);
let result = if self
@ -952,14 +965,14 @@ impl<'a> StorageWriter<'a> {
Ok(result)
}
pub fn remove_all_named_graphs(&mut self) -> Result<()> {
pub fn remove_all_named_graphs(&mut self) -> Result<(), StorageError> {
for graph_name in self.reader().named_graphs() {
self.remove_encoded_named_graph(&graph_name?)?;
}
Ok(())
}
pub fn clear(&mut self) -> Result<()> {
pub fn clear(&mut self) -> Result<(), StorageError> {
for graph_name in self.reader().named_graphs() {
self.remove_encoded_named_graph(&graph_name?)?;
}
@ -972,7 +985,14 @@ impl<'a> StorageWriter<'a> {
/// Creates a database from a dataset files.
#[cfg(not(target_arch = "wasm32"))]
pub fn bulk_load(storage: &Storage, quads: impl IntoIterator<Item = Result<Quad>>) -> Result<()> {
pub fn bulk_load<
EI,
EO: From<StorageError> + From<EI>,
I: IntoIterator<Item = Result<Quad, EI>>,
>(
storage: &Storage,
quads: I,
) -> Result<(), EO> {
let mut threads = Vec::new();
let mut buffer = Vec::with_capacity(BULK_LOAD_BATCH_SIZE);
for quad in quads {
@ -1015,7 +1035,7 @@ impl BulkLoader {
}
}
fn load(&mut self, quads: impl IntoIterator<Item = Quad>) -> Result<()> {
fn load(&mut self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> {
for quad in quads {
let encoded = EncodedQuad::from(quad.as_ref());
self.buffer.clear();
@ -1051,7 +1071,7 @@ impl BulkLoader {
self.save()
}
fn save(&mut self) -> Result<()> {
fn save(&mut self) -> Result<(), StorageError> {
let mut to_load = Vec::new();
// id2str
@ -1175,14 +1195,21 @@ impl BulkLoader {
self.storage.db.insert_stt_files(&to_load)
}
fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<()> {
fn insert_term(
&mut self,
term: TermRef<'_>,
encoded: &EncodedTerm,
) -> Result<(), StorageError> {
insert_term(term, encoded, &mut |key, value| {
self.id2str.entry(*key).or_insert_with(|| value.into());
Ok(())
})
}
fn build_sst_for_keys(&self, values: impl Iterator<Item = Vec<u8>>) -> Result<PathBuf> {
fn build_sst_for_keys(
&self,
values: impl Iterator<Item = Vec<u8>>,
) -> Result<PathBuf, StorageError> {
let mut values = values.collect::<Vec<_>>();
values.sort_unstable();
let mut sst = self.storage.db.new_sst_file()?;

@ -1,17 +1,15 @@
#![allow(clippy::unreadable_literal)]
use crate::error::invalid_data_error;
use crate::model::xsd::*;
use crate::model::*;
use crate::sparql::EvaluationError;
use crate::storage::small_string::SmallString;
use crate::store::{CorruptionError, StorageError};
use siphasher::sip128::{Hasher128, SipHasher24};
use std::error::Error;
use std::fmt::Debug;
use std::hash::Hash;
use std::hash::Hasher;
use std::rc::Rc;
use std::{fmt, io, str};
use std::str;
#[derive(Eq, PartialEq, Debug, Clone, Copy, Hash)]
#[repr(transparent)]
@ -658,18 +656,16 @@ impl From<QuadRef<'_>> for EncodedQuad {
}
pub trait StrLookup {
type Error: Error + Into<EvaluationError> + 'static;
fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError>;
fn get_str(&self, key: &StrHash) -> Result<Option<String>, Self::Error>;
fn contains_str(&self, key: &StrHash) -> Result<bool, Self::Error>;
fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError>;
}
pub fn insert_term<E, F: FnMut(&StrHash, &str) -> Result<(), E>>(
pub fn insert_term<F: FnMut(&StrHash, &str) -> Result<(), StorageError>>(
term: TermRef<'_>,
encoded: &EncodedTerm,
insert_str: &mut F,
) -> Result<(), E> {
) -> Result<(), StorageError> {
match term {
TermRef::NamedNode(node) => {
if let EncodedTerm::NamedNode { iri_id } = encoded {
@ -824,15 +820,16 @@ pub fn parse_day_time_duration_str(value: &str) -> Option<EncodedTerm> {
}
pub trait Decoder: StrLookup {
fn decode_term(&self, encoded: &EncodedTerm) -> Result<Term, DecoderError<Self::Error>>;
fn decode_term(&self, encoded: &EncodedTerm) -> Result<Term, StorageError>;
fn decode_subject(&self, encoded: &EncodedTerm) -> Result<Subject, DecoderError<Self::Error>> {
fn decode_subject(&self, encoded: &EncodedTerm) -> Result<Subject, StorageError> {
match self.decode_term(encoded)? {
Term::NamedNode(named_node) => Ok(named_node.into()),
Term::BlankNode(blank_node) => Ok(blank_node.into()),
Term::Literal(_) => Err(DecoderError::Decoder {
msg: "A literal has been found instead of a subject node".to_owned(),
}),
Term::Literal(_) => Err(CorruptionError::msg(
"A literal has been found instead of a subject node",
)
.into()),
Term::Triple(triple) => Ok(Subject::Triple(triple)),
}
}
@ -840,38 +837,38 @@ pub trait Decoder: StrLookup {
fn decode_named_or_blank_node(
&self,
encoded: &EncodedTerm,
) -> Result<NamedOrBlankNode, DecoderError<Self::Error>> {
) -> Result<NamedOrBlankNode, StorageError> {
match self.decode_term(encoded)? {
Term::NamedNode(named_node) => Ok(named_node.into()),
Term::BlankNode(blank_node) => Ok(blank_node.into()),
Term::Literal(_) => Err(DecoderError::Decoder {
msg: "A literal has been found instead of a named or blank node".to_owned(),
}),
Term::Triple(_) => Err(DecoderError::Decoder {
msg: "A triple has been found instead of a named or blank node".to_owned(),
}),
Term::Literal(_) => Err(CorruptionError::msg(
"A literal has been found instead of a named or blank node",
)
.into()),
Term::Triple(_) => Err(CorruptionError::msg(
"A triple has been found instead of a named or blank node",
)
.into()),
}
}
fn decode_named_node(
&self,
encoded: &EncodedTerm,
) -> Result<NamedNode, DecoderError<Self::Error>> {
fn decode_named_node(&self, encoded: &EncodedTerm) -> Result<NamedNode, StorageError> {
match self.decode_term(encoded)? {
Term::NamedNode(named_node) => Ok(named_node),
Term::BlankNode(_) => Err(DecoderError::Decoder {
msg: "A blank node has been found instead of a named node".to_owned(),
}),
Term::Literal(_) => Err(DecoderError::Decoder {
msg: "A literal has been found instead of a named node".to_owned(),
}),
Term::Triple(_) => Err(DecoderError::Decoder {
msg: "A triple has been found instead of a named node".to_owned(),
}),
Term::BlankNode(_) => Err(CorruptionError::msg(
"A blank node has been found instead of a named node",
)
.into()),
Term::Literal(_) => {
Err(CorruptionError::msg("A literal has been found instead of a named node").into())
}
Term::Triple(_) => {
Err(CorruptionError::msg("A triple has been found instead of a named node").into())
}
}
}
fn decode_triple(&self, encoded: &EncodedTriple) -> Result<Triple, DecoderError<Self::Error>> {
fn decode_triple(&self, encoded: &EncodedTriple) -> Result<Triple, StorageError> {
Ok(Triple::new(
self.decode_subject(&encoded.subject)?,
self.decode_named_node(&encoded.predicate)?,
@ -879,7 +876,7 @@ pub trait Decoder: StrLookup {
))
}
fn decode_quad(&self, encoded: &EncodedQuad) -> Result<Quad, DecoderError<Self::Error>> {
fn decode_quad(&self, encoded: &EncodedQuad) -> Result<Quad, StorageError> {
Ok(Quad::new(
self.decode_subject(&encoded.subject)?,
self.decode_named_node(&encoded.predicate)?,
@ -891,14 +888,14 @@ pub trait Decoder: StrLookup {
Term::NamedNode(named_node) => named_node.into(),
Term::BlankNode(blank_node) => blank_node.into(),
Term::Literal(_) => {
return Err(DecoderError::Decoder {
msg: "A literal is not a valid graph name".to_owned(),
})
return Err(
CorruptionError::msg("A literal is not a valid graph name").into()
)
}
Term::Triple(_) => {
return Err(DecoderError::Decoder {
msg: "A triple is not a valid graph name".to_owned(),
})
return Err(
CorruptionError::msg("A triple is not a valid graph name").into()
)
}
}
},
@ -907,11 +904,11 @@ pub trait Decoder: StrLookup {
}
impl<S: StrLookup> Decoder for S {
fn decode_term(&self, encoded: &EncodedTerm) -> Result<Term, DecoderError<Self::Error>> {
fn decode_term(&self, encoded: &EncodedTerm) -> Result<Term, StorageError> {
match encoded {
EncodedTerm::DefaultGraph => Err(DecoderError::Decoder {
msg: "The default graph tag is not a valid term".to_owned(),
}),
EncodedTerm::DefaultGraph => {
Err(CorruptionError::msg("The default graph tag is not a valid term").into())
}
EncodedTerm::NamedNode { iri_id } => {
Ok(NamedNode::new_unchecked(get_required_str(self, iri_id)?).into())
}
@ -987,50 +984,11 @@ impl<S: StrLookup> Decoder for S {
}
}
fn get_required_str<L: StrLookup>(
lookup: &L,
id: &StrHash,
) -> Result<String, DecoderError<L::Error>> {
lookup
.get_str(id)
.map_err(DecoderError::Store)?
.ok_or_else(|| DecoderError::Decoder {
msg: format!(
"Not able to find the string with id {:?} in the string store",
id
),
})
}
#[derive(Debug)]
pub enum DecoderError<E> {
Store(E),
Decoder { msg: String },
}
impl<E: fmt::Display> fmt::Display for DecoderError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Store(e) => e.fmt(f),
Self::Decoder { msg } => write!(f, "{}", msg),
}
}
}
impl<E: Error + 'static> Error for DecoderError<E> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Store(e) => Some(e),
Self::Decoder { .. } => None,
}
}
}
impl<E: Into<Self>> From<DecoderError<E>> for io::Error {
fn from(e: DecoderError<E>) -> Self {
match e {
DecoderError::Store(e) => e.into(),
DecoderError::Decoder { msg } => invalid_data_error(msg),
}
}
fn get_required_str<L: StrLookup>(lookup: &L, id: &StrHash) -> Result<String, StorageError> {
Ok(lookup.get_str(id)?.ok_or_else(|| {
CorruptionError::new(format!(
"Not able to find the string with id {:?} in the string store",
id
))
})?)
}

@ -23,7 +23,7 @@
//! };
//! # Result::<_,Box<dyn std::error::Error>>::Ok(())
//! ```
use crate::error::invalid_input_error;
use crate::io::read::ParserError;
use crate::io::{
DatasetFormat, DatasetParser, DatasetSerializer, GraphFormat, GraphParser, GraphSerializer,
};
@ -36,10 +36,11 @@ use crate::sparql::{
use crate::storage::bulk_load;
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader};
pub use crate::storage::{CorruptionError, LoaderError, SerializerError, StorageError};
use std::io::{BufRead, Write};
#[cfg(not(target_arch = "wasm32"))]
use std::path::Path;
use std::{fmt, io, str};
use std::{fmt, str};
/// An on-disk [RDF dataset](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset).
/// Allows to query and update it using SPARQL.
@ -86,7 +87,7 @@ pub struct Store {
impl Store {
/// Creates a temporary [`Store`] that will be deleted after drop.
pub fn new() -> io::Result<Self> {
pub fn new() -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::new()?,
})
@ -94,7 +95,7 @@ impl Store {
/// Opens a [`Store`] and creates it if it does not exist yet.
#[cfg(not(target_arch = "wasm32"))]
pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
pub fn open(path: impl AsRef<Path>) -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::open(path.as_ref())?,
})
@ -180,7 +181,7 @@ impl Store {
}
/// Checks if this store contains a given quad.
pub fn contains<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
pub fn contains<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool, StorageError> {
let quad = EncodedQuad::from(quad.into());
self.storage.snapshot().contains(&quad)
}
@ -188,12 +189,12 @@ impl Store {
/// Returns the number of quads in the store.
///
/// Warning: this function executes a full scan.
pub fn len(&self) -> io::Result<usize> {
pub fn len(&self) -> Result<usize, StorageError> {
self.storage.snapshot().len()
}
/// Returns if the store is empty.
pub fn is_empty(&self) -> io::Result<bool> {
pub fn is_empty(&self) -> Result<bool, StorageError> {
self.storage.snapshot().is_empty()
}
@ -268,16 +269,16 @@ impl Store {
format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>,
) -> io::Result<()> {
) -> Result<(), LoaderError> {
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(invalid_input_error)?;
.map_err(|e| ParserError::invalid_base_iri(base_iri, e))?;
}
let quads = parser
.read_triples(reader)?
.collect::<io::Result<Vec<_>>>()?;
.collect::<Result<Vec<_>, _>>()?;
let to_graph_name = to_graph_name.into();
self.storage.transaction(move |mut t| {
for quad in &quads {
@ -317,14 +318,14 @@ impl Store {
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> io::Result<()> {
) -> Result<(), LoaderError> {
let mut parser = DatasetParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(invalid_input_error)?;
.map_err(|e| ParserError::invalid_base_iri(base_iri, e))?;
}
let quads = parser.read_quads(reader)?.collect::<io::Result<Vec<_>>>()?;
let quads = parser.read_quads(reader)?.collect::<Result<Vec<_>, _>>()?;
self.storage.transaction(move |mut t| {
for quad in &quads {
t.insert(quad.into())?;
@ -351,7 +352,7 @@ impl Store {
/// assert!(store.contains(quad)?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn insert<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
pub fn insert<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool, StorageError> {
let quad = quad.into();
self.storage.transaction(move |mut t| t.insert(quad))
}
@ -359,7 +360,7 @@ impl Store {
/// Adds atomically a set of quads to this store.
///
/// Warning: This operation uses a memory heavy transaction internally, use [`bulk_extend`](Store::bulk_extend) if you plan to add ten of millions of triples.
pub fn extend(&self, quads: impl IntoIterator<Item = Quad>) -> io::Result<()> {
pub fn extend(&self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> {
let quads = quads.into_iter().collect::<Vec<_>>();
self.storage.transaction(move |mut t| {
for quad in &quads {
@ -388,7 +389,7 @@ impl Store {
/// assert!(!store.contains(quad)?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn remove<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
pub fn remove<'a>(&self, quad: impl Into<QuadRef<'a>>) -> Result<bool, StorageError> {
let quad = quad.into();
self.storage.transaction(move |mut t| t.remove(quad))
}
@ -416,12 +417,13 @@ impl Store {
writer: impl Write,
format: GraphFormat,
from_graph_name: impl Into<GraphNameRef<'a>>,
) -> io::Result<()> {
) -> Result<(), SerializerError> {
let mut writer = GraphSerializer::from_format(format).triple_writer(writer)?;
for quad in self.quads_for_pattern(None, None, None, Some(from_graph_name.into())) {
writer.write(quad?.as_ref())?;
}
writer.finish()
writer.finish()?;
Ok(())
}
/// Dumps the store into a file.
@ -440,12 +442,17 @@ impl Store {
/// assert_eq!(file, buffer.as_slice());
/// # std::io::Result::Ok(())
/// ```
pub fn dump_dataset(&self, writer: impl Write, format: DatasetFormat) -> io::Result<()> {
pub fn dump_dataset(
&self,
writer: impl Write,
format: DatasetFormat,
) -> Result<(), SerializerError> {
let mut writer = DatasetSerializer::from_format(format).quad_writer(writer)?;
for quad in self.iter() {
writer.write(&quad?)?;
}
writer.finish()
writer.finish()?;
Ok(())
}
/// Returns all the store named graphs
@ -486,7 +493,7 @@ impl Store {
pub fn contains_named_graph<'a>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
) -> Result<bool, StorageError> {
let graph_name = EncodedTerm::from(graph_name.into());
self.storage.snapshot().contains_named_graph(&graph_name)
}
@ -510,7 +517,7 @@ impl Store {
pub fn insert_named_graph<'a>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
) -> Result<bool, StorageError> {
let graph_name = graph_name.into();
self.storage
.transaction(move |mut t| t.insert_named_graph(graph_name))
@ -534,7 +541,10 @@ impl Store {
/// assert_eq!(1, store.named_graphs().count());
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn clear_graph<'a>(&self, graph_name: impl Into<GraphNameRef<'a>>) -> io::Result<()> {
pub fn clear_graph<'a>(
&self,
graph_name: impl Into<GraphNameRef<'a>>,
) -> Result<(), StorageError> {
let graph_name = graph_name.into();
self.storage
.transaction(move |mut t| t.clear_graph(graph_name))
@ -563,7 +573,7 @@ impl Store {
pub fn remove_named_graph<'a>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
) -> Result<bool, StorageError> {
let graph_name = graph_name.into();
self.storage
.transaction(move |mut t| t.remove_named_graph(graph_name))
@ -586,7 +596,7 @@ impl Store {
/// assert!(store.is_empty()?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn clear(&self) -> io::Result<()> {
pub fn clear(&self) -> Result<(), StorageError> {
self.storage.transaction(|mut t| t.clear())
}
@ -594,7 +604,7 @@ impl Store {
///
/// Flushes are automatically done using background threads but might lag a little bit.
#[cfg(not(target_arch = "wasm32"))]
pub fn flush(&self) -> io::Result<()> {
pub fn flush(&self) -> Result<(), StorageError> {
self.storage.flush()
}
@ -604,7 +614,7 @@ impl Store {
///
/// Warning: Can take hours on huge databases.
#[cfg(not(target_arch = "wasm32"))]
pub fn optimize(&self) -> io::Result<()> {
pub fn optimize(&self) -> Result<(), StorageError> {
self.storage.compact()
}
@ -645,12 +655,12 @@ impl Store {
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> io::Result<()> {
) -> Result<(), LoaderError> {
let mut parser = DatasetParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(invalid_input_error)?;
.map_err(|e| ParserError::invalid_base_iri(base_iri, e))?;
}
bulk_load(&self.storage, parser.read_quads(reader)?)
}
@ -693,19 +703,19 @@ impl Store {
format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>,
) -> io::Result<()> {
) -> Result<(), LoaderError> {
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(invalid_input_error)?;
.map_err(|e| ParserError::invalid_base_iri(base_iri, e))?;
}
let to_graph_name = to_graph_name.into();
bulk_load(
&self.storage,
parser
.read_triples(reader)?
.map(|r| Ok(r?.in_graph(to_graph_name.into_owned()))),
.map(|r| r.map(|q| q.in_graph(to_graph_name.into_owned()))),
)
}
@ -717,8 +727,8 @@ impl Store {
///
/// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files.
#[cfg(not(target_arch = "wasm32"))]
pub fn bulk_extend(&self, quads: impl IntoIterator<Item = Quad>) -> io::Result<()> {
bulk_load(&self.storage, quads.into_iter().map(Ok))
pub fn bulk_extend(&self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> {
bulk_load::<StorageError, _, _>(&self.storage, quads.into_iter().map(Ok))
}
}
@ -738,11 +748,11 @@ pub struct QuadIter {
}
impl Iterator for QuadIter {
type Item = io::Result<Quad>;
type Item = Result<Quad, StorageError>;
fn next(&mut self) -> Option<io::Result<Quad>> {
fn next(&mut self) -> Option<Result<Quad, StorageError>> {
Some(match self.iter.next()? {
Ok(quad) => self.reader.decode_quad(&quad).map_err(|e| e.into()),
Ok(quad) => self.reader.decode_quad(&quad),
Err(error) => Err(error),
})
}
@ -755,13 +765,13 @@ pub struct GraphNameIter {
}
impl Iterator for GraphNameIter {
type Item = io::Result<NamedOrBlankNode>;
type Item = Result<NamedOrBlankNode, StorageError>;
fn next(&mut self) -> Option<io::Result<NamedOrBlankNode>> {
fn next(&mut self) -> Option<Result<NamedOrBlankNode, StorageError>> {
Some(
self.iter
.next()?
.and_then(|graph_name| Ok(self.reader.decode_named_or_blank_node(&graph_name)?)),
.and_then(|graph_name| self.reader.decode_named_or_blank_node(&graph_name)),
)
}
@ -771,7 +781,7 @@ impl Iterator for GraphNameIter {
}
#[test]
fn store() -> io::Result<()> {
fn store() -> Result<(), StorageError> {
use crate::model::*;
let main_s = Subject::from(BlankNode::default());

@ -2,7 +2,8 @@ use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::model::vocab::{rdf, xsd};
use oxigraph::model::*;
use oxigraph::store::Store;
use std::io::{Cursor, Result};
use std::error::Error;
use std::io::Cursor;
use std::process::Command;
const DATA: &str = r#"
@ -74,7 +75,7 @@ fn quads(graph_name: impl Into<GraphNameRef<'static>>) -> Vec<QuadRef<'static>>
}
#[test]
fn test_load_graph() -> Result<()> {
fn test_load_graph() -> Result<(), Box<dyn Error>> {
let store = Store::new()?;
store.load_graph(
Cursor::new(DATA),
@ -89,7 +90,7 @@ fn test_load_graph() -> Result<()> {
}
#[test]
fn test_load_dataset() -> Result<()> {
fn test_load_dataset() -> Result<(), Box<dyn Error>> {
let store = Store::new()?;
store.load_dataset(Cursor::new(DATA), DatasetFormat::TriG, None)?;
for q in quads(GraphNameRef::DefaultGraph) {
@ -99,7 +100,7 @@ fn test_load_dataset() -> Result<()> {
}
#[test]
fn test_bulk_load_dataset() -> Result<()> {
fn test_bulk_load_dataset() -> Result<(), Box<dyn Error>> {
let store = Store::new().unwrap();
store.bulk_load_dataset(Cursor::new(DATA), DatasetFormat::TriG, None)?;
for q in quads(GraphNameRef::DefaultGraph) {
@ -109,7 +110,7 @@ fn test_bulk_load_dataset() -> Result<()> {
}
#[test]
fn test_load_graph_generates_new_blank_nodes() -> Result<()> {
fn test_load_graph_generates_new_blank_nodes() -> Result<(), Box<dyn Error>> {
let store = Store::new()?;
for _ in 0..2 {
store.load_graph(
@ -124,7 +125,7 @@ fn test_load_graph_generates_new_blank_nodes() -> Result<()> {
}
#[test]
fn test_dump_graph() -> Result<()> {
fn test_dump_graph() -> Result<(), Box<dyn Error>> {
let store = Store::new()?;
for q in quads(GraphNameRef::DefaultGraph) {
store.insert(q)?;
@ -144,7 +145,7 @@ fn test_dump_graph() -> Result<()> {
}
#[test]
fn test_dump_dataset() -> Result<()> {
fn test_dump_dataset() -> Result<(), Box<dyn Error>> {
let store = Store::new()?;
for q in quads(GraphNameRef::DefaultGraph) {
store.insert(q)?;
@ -160,7 +161,7 @@ fn test_dump_dataset() -> Result<()> {
}
#[test]
fn test_snapshot_isolation_iterator() -> Result<()> {
fn test_snapshot_isolation_iterator() -> Result<(), Box<dyn Error>> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
@ -171,12 +172,15 @@ fn test_snapshot_isolation_iterator() -> Result<()> {
store.insert(quad)?;
let iter = store.iter();
store.remove(quad)?;
assert_eq!(iter.collect::<Result<Vec<_>>>()?, vec![quad.into_owned()]);
assert_eq!(
iter.collect::<Result<Vec<_>, _>>()?,
vec![quad.into_owned()]
);
Ok(())
}
#[test]
fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<()> {
fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box<dyn Error>> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
@ -192,7 +196,7 @@ fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<()> {
#[test]
#[cfg(target_os = "linux")]
fn test_backward_compatibility() -> Result<()> {
fn test_backward_compatibility() -> Result<(), Box<dyn Error>> {
// We run twice to check if data is properly saved and closed
for _ in 0..2 {
let store = Store::open("tests/rocksdb_bc_data")?;
@ -207,14 +211,14 @@ fn test_backward_compatibility() -> Result<()> {
assert!(store.contains_named_graph(graph_name)?);
assert_eq!(
vec![NamedOrBlankNode::from(graph_name)],
store.named_graphs().collect::<Result<Vec<_>>>()?
store.named_graphs().collect::<Result<Vec<_>, _>>()?
);
}
reset_dir("tests/rocksdb_bc_data")?;
Ok(())
}
fn reset_dir(dir: &str) -> Result<()> {
fn reset_dir(dir: &str) -> Result<(), Box<dyn Error>> {
assert!(Command::new("git")
.args(&["clean", "-fX", dir])
.status()?

@ -1,7 +1,7 @@
#![allow(clippy::needless_option_as_deref)]
use crate::model::{PyQuad, PyTriple};
use oxigraph::io::read::{QuadReader, TripleReader};
use oxigraph::io::read::{ParserError, QuadReader, TripleReader};
use oxigraph::io::{
DatasetFormat, DatasetParser, DatasetSerializer, GraphFormat, GraphParser, GraphSerializer,
};
@ -61,7 +61,7 @@ pub fn parse(
.map_err(|e| PyValueError::new_err(e.to_string()))?;
}
Ok(PyTripleReader {
inner: parser.read_triples(input).map_err(map_io_err)?,
inner: parser.read_triples(input).map_err(map_parser_error)?,
}
.into_py(py))
} else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) {
@ -72,7 +72,7 @@ pub fn parse(
.map_err(|e| PyValueError::new_err(e.to_string()))?;
}
Ok(PyQuadReader {
inner: parser.read_quads(input).map_err(map_io_err)?,
inner: parser.read_quads(input).map_err(map_parser_error)?,
}
.into_py(py))
} else {
@ -158,7 +158,7 @@ impl PyTripleReader {
fn __next__(&mut self) -> PyResult<Option<PyTriple>> {
self.inner
.next()
.map(|q| Ok(q.map_err(map_io_err)?.into()))
.map(|q| Ok(q.map_err(map_parser_error)?.into()))
.transpose()
}
}
@ -177,7 +177,7 @@ impl PyQuadReader {
fn __next__(&mut self) -> PyResult<Option<PyQuad>> {
self.inner
.next()
.map(|q| Ok(q.map_err(map_io_err)?.into()))
.map(|q| Ok(q.map_err(map_parser_error)?.into()))
.transpose()
}
}
@ -243,12 +243,13 @@ fn to_io_err(error: impl Into<PyErr>, py: Python<'_>) -> io::Error {
}
}
pub fn map_io_err(error: io::Error) -> PyErr {
match error.kind() {
io::ErrorKind::InvalidInput => PyValueError::new_err(error.to_string()),
io::ErrorKind::InvalidData | io::ErrorKind::UnexpectedEof => {
PySyntaxError::new_err(error.to_string())
}
_ => PyIOError::new_err(error.to_string()),
pub(crate) fn map_io_err(error: io::Error) -> PyErr {
PyIOError::new_err(error.to_string())
}
pub(crate) fn map_parser_error(error: ParserError) -> PyErr {
match error {
ParserError::Syntax(error) => PySyntaxError::new_err(error.to_string()),
ParserError::Io(error) => map_io_err(error),
}
}

@ -1,4 +1,5 @@
use crate::io::map_io_err;
use crate::io::{map_io_err, map_parser_error};
use crate::map_storage_error;
use crate::model::*;
use oxigraph::model::Term;
use oxigraph::sparql::*;
@ -223,10 +224,12 @@ impl PyQueryTriples {
}
}
pub fn map_evaluation_error(error: EvaluationError) -> PyErr {
pub(crate) fn map_evaluation_error(error: EvaluationError) -> PyErr {
match error {
EvaluationError::Parsing(error) => PySyntaxError::new_err(error.to_string()),
EvaluationError::Storage(error) => map_storage_error(error),
EvaluationError::Io(error) => map_io_err(error),
EvaluationError::ExternalParser(error) => map_parser_error(error),
EvaluationError::Query(error) => PyValueError::new_err(error.to_string()),
_ => PyRuntimeError::new_err(error.to_string()),
}

@ -1,13 +1,13 @@
#![allow(clippy::needless_option_as_deref)]
use crate::io::{map_io_err, PyFileLike};
use crate::io::{map_parser_error, PyFileLike};
use crate::model::*;
use crate::sparql::*;
use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::model::GraphNameRef;
use oxigraph::sparql::Update;
use oxigraph::store::{self, Store};
use pyo3::exceptions::PyValueError;
use oxigraph::store::{self, LoaderError, SerializerError, StorageError, Store};
use pyo3::exceptions::{PyIOError, PyRuntimeError, PyValueError};
use pyo3::prelude::*;
use pyo3::{Py, PyRef};
use std::io::BufReader;
@ -48,7 +48,7 @@ impl PyStore {
} else {
Store::new()
}
.map_err(map_io_err)?,
.map_err(map_storage_error)?,
})
}
@ -64,7 +64,7 @@ impl PyStore {
/// [<Quad subject=<NamedNode value=http://example.com> predicate=<NamedNode value=http://example.com/p> object=<Literal value=1 datatype=<NamedNode value=http://www.w3.org/2001/XMLSchema#string>> graph_name=<NamedNode value=http://example.com/g>>]
#[pyo3(text_signature = "($self, quad)")]
fn add(&self, quad: &PyQuad) -> PyResult<()> {
self.inner.insert(quad).map_err(map_io_err)?;
self.inner.insert(quad).map_err(map_storage_error)?;
Ok(())
}
@ -82,7 +82,7 @@ impl PyStore {
/// []
#[pyo3(text_signature = "($self, quad)")]
fn remove(&self, quad: &PyQuad) -> PyResult<()> {
self.inner.remove(quad).map_err(map_io_err)?;
self.inner.remove(quad).map_err(map_storage_error)?;
Ok(())
}
@ -293,7 +293,7 @@ impl PyStore {
&to_graph_name.unwrap_or(PyGraphNameRef::DefaultGraph),
base_iri,
)
.map_err(map_io_err)
.map_err(map_loader_error)
} else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) {
if to_graph_name.is_some() {
return Err(PyValueError::new_err(
@ -302,7 +302,7 @@ impl PyStore {
}
self.inner
.load_dataset(input, dataset_format, base_iri)
.map_err(map_io_err)
.map_err(map_loader_error)
} else {
Err(PyValueError::new_err(format!(
"Not supported MIME type: {}",
@ -369,7 +369,7 @@ impl PyStore {
&to_graph_name.unwrap_or(PyGraphNameRef::DefaultGraph),
base_iri,
)
.map_err(map_io_err)
.map_err(map_loader_error)
} else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) {
if to_graph_name.is_some() {
return Err(PyValueError::new_err(
@ -378,7 +378,7 @@ impl PyStore {
}
self.inner
.bulk_load_dataset(input, dataset_format, base_iri)
.map_err(map_io_err)
.map_err(map_loader_error)
} else {
Err(PyValueError::new_err(format!(
"Not supported MIME type: {}",
@ -432,7 +432,7 @@ impl PyStore {
graph_format,
&from_graph_name.unwrap_or(PyGraphNameRef::DefaultGraph),
)
.map_err(map_io_err)
.map_err(map_serializer_error)
} else if let Some(dataset_format) = DatasetFormat::from_media_type(mime_type) {
if from_graph_name.is_some() {
return Err(PyValueError::new_err(
@ -441,7 +441,7 @@ impl PyStore {
}
self.inner
.dump_dataset(output, dataset_format)
.map_err(map_io_err)
.map_err(map_serializer_error)
} else {
Err(PyValueError::new_err(format!(
"Not supported MIME type: {}",
@ -490,7 +490,7 @@ impl PyStore {
.insert_named_graph(&PyNamedOrBlankNodeRef::BlankNode(graph_name))
.map(|_| ()),
}
.map_err(map_io_err)
.map_err(map_storage_error)
}
/// Removes a graph from the store.
@ -519,7 +519,7 @@ impl PyStore {
.remove_named_graph(&PyNamedOrBlankNodeRef::BlankNode(graph_name))
.map(|_| ()),
}
.map_err(map_io_err)?;
.map_err(map_storage_error)?;
Ok(())
}
@ -528,15 +528,15 @@ impl PyStore {
}
fn __bool__(&self) -> PyResult<bool> {
Ok(!self.inner.is_empty()?)
Ok(!self.inner.is_empty().map_err(map_storage_error)?)
}
fn __len__(&self) -> PyResult<usize> {
Ok(self.inner.len()?)
self.inner.len().map_err(map_storage_error)
}
fn __contains__(&self, quad: PyQuad) -> PyResult<bool> {
self.inner.contains(&quad).map_err(map_io_err)
self.inner.contains(&quad).map_err(map_storage_error)
}
fn __iter__(&self) -> QuadIter {
@ -560,7 +560,7 @@ impl QuadIter {
fn __next__(&mut self) -> PyResult<Option<PyQuad>> {
self.inner
.next()
.map(|q| Ok(q.map_err(map_io_err)?.into()))
.map(|q| Ok(q.map_err(map_storage_error)?.into()))
.transpose()
}
}
@ -579,7 +579,7 @@ impl GraphNameIter {
fn __next__(&mut self) -> PyResult<Option<PyNamedOrBlankNode>> {
self.inner
.next()
.map(|q| Ok(q.map_err(map_io_err)?.into()))
.map(|q| Ok(q.map_err(map_storage_error)?.into()))
.transpose()
}
}
@ -622,3 +622,24 @@ pub fn extract_quads_pattern<'a>(
},
))
}
pub(crate) fn map_storage_error(error: StorageError) -> PyErr {
match error {
StorageError::Io(error) => PyIOError::new_err(error.to_string()),
_ => PyRuntimeError::new_err(error.to_string()),
}
}
pub(crate) fn map_loader_error(error: LoaderError) -> PyErr {
match error {
LoaderError::Storage(error) => map_storage_error(error),
LoaderError::Parser(error) => map_parser_error(error),
}
}
pub(crate) fn map_serializer_error(error: SerializerError) -> PyErr {
match error {
SerializerError::Storage(error) => map_storage_error(error),
SerializerError::Io(error) => PyIOError::new_err(error.to_string()),
}
}

@ -21,7 +21,7 @@ use rand::random;
use std::cell::RefCell;
use std::cmp::min;
use std::fs::File;
use std::io::{BufReader, Error, ErrorKind, Read, Write};
use std::io::{self, BufReader, ErrorKind, Read, Write};
use std::rc::Rc;
use std::str::FromStr;
use std::thread::{spawn, JoinHandle};
@ -90,7 +90,7 @@ pub fn main() -> std::io::Result<()> {
.or_else(|| GraphFormat::from_extension(extension)?.try_into().ok())
})
.ok_or_else(|| {
Error::new(
io::Error::new(
ErrorKind::InvalidInput,
"The server is not able to guess the file format of {} from its extension",
)
@ -98,11 +98,12 @@ pub fn main() -> std::io::Result<()> {
store.bulk_load_dataset(BufReader::new(File::open(file)?), format, None)?;
Ok(())
})
}).collect::<Vec<JoinHandle<Result<(),Error>>>>();
}).collect::<Vec<JoinHandle<io::Result<()>>>>();
for handle in handles {
handle.join().unwrap()?;
}
store.optimize()
store.optimize()?;
Ok(())
}
("serve", Some(submatches)) => {
let bind = submatches.value_of("bind").unwrap();
@ -112,7 +113,8 @@ pub fn main() -> std::io::Result<()> {
.set_server_name(concat!("Oxigraph/", env!("CARGO_PKG_VERSION")))
.unwrap();
println!("Listening for requests at http://{}", &bind);
server.listen(bind)
server.listen(bind)?;
Ok(())
}
(s, _) => {
eprintln!("Not supported subcommand: '{}'", s);
@ -597,7 +599,7 @@ fn evaluate_sparql_query(
},
|(mut writer, mut triples)| {
Ok(if let Some(t) = triples.next() {
writer.write(&t.map_err(|e| Error::new(ErrorKind::Other, e))?)?;
writer.write(&t?)?;
Some((writer, triples))
} else {
writer.finish()?;

@ -87,7 +87,14 @@ impl fmt::Display for ParseError {
}
}
impl Error for ParseError {}
impl Error for ParseError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self.inner {
ParseErrorKind::InvalidBaseIri(ref e) => Some(e),
ParseErrorKind::Parser(ref e) => Some(e),
}
}
}
struct AnnotatedTerm {
term: TermPattern,

Loading…
Cancel
Save