Simplifies the public API

pull/35/head
Tpt 5 years ago
parent 2d31de987a
commit cdb4cc4a39
  1. 33
      js/src/store.rs
  2. 14
      lib/benches/sparql_query.rs
  3. 12
      lib/src/error.rs
  4. 30
      lib/src/lib.rs
  5. 305
      lib/src/repository.rs
  6. 13
      lib/src/sparql/mod.rs
  7. 714
      lib/src/store/memory.rs
  8. 261
      lib/src/store/mod.rs
  9. 9
      lib/src/store/numeric_encoder.rs
  10. 463
      lib/src/store/rocksdb.rs
  11. 34
      lib/tests/service_test_cases.rs
  12. 52
      lib/tests/sparql_test_cases.rs
  13. 17
      lib/tests/wasm.rs
  14. 97
      server/src/main.rs
  15. 18
      wikibase/src/loader.rs
  16. 65
      wikibase/src/main.rs

@ -4,10 +4,7 @@ use crate::utils::to_err;
use js_sys::{Array, Map}; use js_sys::{Array, Map};
use oxigraph::model::NamedOrBlankNode; use oxigraph::model::NamedOrBlankNode;
use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult}; use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult};
use oxigraph::{ use oxigraph::{DatasetSyntax, Error, FileSyntax, GraphSyntax, MemoryStore};
DatasetSyntax, Error, FileSyntax, GraphSyntax, MemoryRepository, Repository,
RepositoryConnection,
};
use std::convert::TryInto; use std::convert::TryInto;
use std::io::Cursor; use std::io::Cursor;
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
@ -15,7 +12,7 @@ use wasm_bindgen::prelude::*;
#[wasm_bindgen(js_name = MemoryStore)] #[wasm_bindgen(js_name = MemoryStore)]
#[derive(Default)] #[derive(Default)]
pub struct JsMemoryStore { pub struct JsMemoryStore {
store: MemoryRepository, store: MemoryStore,
from_js: FromJsConverter, from_js: FromJsConverter,
} }
@ -25,13 +22,13 @@ impl JsMemoryStore {
pub fn new(quads: Option<Box<[JsValue]>>) -> Result<JsMemoryStore, JsValue> { pub fn new(quads: Option<Box<[JsValue]>>) -> Result<JsMemoryStore, JsValue> {
console_error_panic_hook::set_once(); console_error_panic_hook::set_once();
let this = Self::default(); let store = Self::default();
if let Some(quads) = quads { if let Some(quads) = quads {
for quad in quads.iter() { for quad in quads.iter() {
this.add(quad)?; store.add(quad)?;
} }
} }
Ok(this) Ok(store)
} }
#[wasm_bindgen(js_name = dataFactory, getter)] #[wasm_bindgen(js_name = dataFactory, getter)]
@ -41,26 +38,20 @@ impl JsMemoryStore {
pub fn add(&self, quad: &JsValue) -> Result<(), JsValue> { pub fn add(&self, quad: &JsValue) -> Result<(), JsValue> {
self.store self.store
.connection()
.map_err(to_err)?
.insert(&self.from_js.to_quad(quad)?.try_into()?) .insert(&self.from_js.to_quad(quad)?.try_into()?)
.map_err(to_err) .map_err(to_err)
} }
pub fn delete(&self, quad: &JsValue) -> Result<(), JsValue> { pub fn delete(&self, quad: &JsValue) -> Result<(), JsValue> {
self.store self.store
.connection()
.map_err(to_err)?
.remove(&self.from_js.to_quad(quad)?.try_into()?) .remove(&self.from_js.to_quad(quad)?.try_into()?)
.map_err(to_err) .map_err(to_err)
} }
pub fn has(&self, quad: &JsValue) -> Result<bool, JsValue> { pub fn has(&self, quad: &JsValue) -> Result<bool, JsValue> {
self.store Ok(self
.connection() .store
.map_err(to_err)? .contains(&self.from_js.to_quad(quad)?.try_into()?))
.contains(&self.from_js.to_quad(quad)?.try_into()?)
.map_err(to_err)
} }
#[wasm_bindgen(js_name = match)] #[wasm_bindgen(js_name = match)]
@ -73,8 +64,6 @@ impl JsMemoryStore {
) -> Result<Box<[JsValue]>, JsValue> { ) -> Result<Box<[JsValue]>, JsValue> {
Ok(self Ok(self
.store .store
.connection()
.map_err(to_err)?
.quads_for_pattern( .quads_for_pattern(
match self.from_js.to_optional_term(subject)? { match self.from_js.to_optional_term(subject)? {
Some(JsTerm::NamedNode(node)) => Some(node.into()), Some(JsTerm::NamedNode(node)) => Some(node.into()),
@ -123,8 +112,6 @@ impl JsMemoryStore {
pub fn query(&self, query: &str) -> Result<JsValue, JsValue> { pub fn query(&self, query: &str) -> Result<JsValue, JsValue> {
let query = self let query = self
.store .store
.connection()
.map_err(to_err)?
.prepare_query(query, QueryOptions::default()) .prepare_query(query, QueryOptions::default())
.map_err(to_err)?; .map_err(to_err)?;
let results = query.exec().map_err(to_err)?; let results = query.exec().map_err(to_err)?;
@ -194,8 +181,6 @@ impl JsMemoryStore {
if let Some(graph_syntax) = GraphSyntax::from_mime_type(mime_type) { if let Some(graph_syntax) = GraphSyntax::from_mime_type(mime_type) {
self.store self.store
.connection()
.map_err(to_err)?
.load_graph( .load_graph(
Cursor::new(data), Cursor::new(data),
graph_syntax, graph_syntax,
@ -210,8 +195,6 @@ impl JsMemoryStore {
)); ));
} }
self.store self.store
.connection()
.map_err(to_err)?
.load_dataset(Cursor::new(data), dataset_syntax, base_iri.as_deref()) .load_dataset(Cursor::new(data), dataset_syntax, base_iri.as_deref())
.map_err(to_err) .map_err(to_err)
} else { } else {

@ -79,19 +79,17 @@ fn read_file_to_string(url: &str) -> Result<String> {
} }
fn load_graph(url: &str) -> Result<SimpleGraph> { fn load_graph(url: &str) -> Result<SimpleGraph> {
let repository = MemoryRepository::default(); let mut store = MemoryStore::default();
load_graph_to_repository(url, &mut repository.connection().unwrap(), None)?; load_graph_to_store(url, &store, None)?;
Ok(repository Ok(store
.connection()
.unwrap()
.quads_for_pattern(None, None, None, Some(None)) .quads_for_pattern(None, None, None, Some(None))
.map(|q| q.unwrap().into_triple()) .map(|q| q.unwrap().into_triple())
.collect()) .collect())
} }
fn load_graph_to_repository( fn load_graph_to_store(
url: &str, url: &str,
connection: &mut <&MemoryRepository as Repository>::Connection, store: &MemoryStore,
to_graph_name: Option<&NamedOrBlankNode>, to_graph_name: Option<&NamedOrBlankNode>,
) -> Result<()> { ) -> Result<()> {
let syntax = if url.ends_with(".nt") { let syntax = if url.ends_with(".nt") {
@ -106,7 +104,7 @@ fn load_graph_to_repository(
url url
))); )));
}; };
connection.load_graph(read_file(url)?, syntax, to_graph_name, Some(url)) store.load_graph(read_file(url)?, syntax, to_graph_name, Some(url))
} }
mod mf { mod mf {

@ -8,7 +8,6 @@ use std::error;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::string::FromUtf8Error; use std::string::FromUtf8Error;
use std::sync::PoisonError;
/// The Oxigraph error type. /// The Oxigraph error type.
/// ///
@ -25,7 +24,6 @@ impl fmt::Display for Error {
ErrorKind::Msg { msg } => write!(f, "{}", msg), ErrorKind::Msg { msg } => write!(f, "{}", msg),
ErrorKind::Io(e) => e.fmt(f), ErrorKind::Io(e) => e.fmt(f),
ErrorKind::FromUtf8(e) => e.fmt(f), ErrorKind::FromUtf8(e) => e.fmt(f),
ErrorKind::Poison => write!(f, "Mutex was poisoned"),
ErrorKind::Iri(e) => e.fmt(f), ErrorKind::Iri(e) => e.fmt(f),
ErrorKind::LanguageTag(e) => e.fmt(f), ErrorKind::LanguageTag(e) => e.fmt(f),
ErrorKind::Other(e) => e.fmt(f), ErrorKind::Other(e) => e.fmt(f),
@ -39,7 +37,6 @@ impl error::Error for Error {
ErrorKind::Msg { .. } => None, ErrorKind::Msg { .. } => None,
ErrorKind::Io(e) => Some(e), ErrorKind::Io(e) => Some(e),
ErrorKind::FromUtf8(e) => Some(e), ErrorKind::FromUtf8(e) => Some(e),
ErrorKind::Poison => None,
ErrorKind::Iri(e) => Some(e), ErrorKind::Iri(e) => Some(e),
ErrorKind::LanguageTag(e) => Some(e), ErrorKind::LanguageTag(e) => Some(e),
ErrorKind::Other(e) => Some(e.as_ref()), ErrorKind::Other(e) => Some(e.as_ref()),
@ -68,7 +65,6 @@ enum ErrorKind {
Msg { msg: String }, Msg { msg: String },
Io(io::Error), Io(io::Error),
FromUtf8(FromUtf8Error), FromUtf8(FromUtf8Error),
Poison,
Iri(IriParseError), Iri(IriParseError),
LanguageTag(LanguageTagParseError), LanguageTag(LanguageTagParseError),
Other(Box<dyn error::Error + Send + Sync + 'static>), Other(Box<dyn error::Error + Send + Sync + 'static>),
@ -130,14 +126,6 @@ impl From<ParseError<LineCol>> for Error {
} }
} }
impl<T> From<PoisonError<T>> for Error {
fn from(_: PoisonError<T>) -> Self {
Self {
inner: ErrorKind::Poison,
}
}
}
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
impl From<rocksdb::Error> for Error { impl From<rocksdb::Error> for Error {
fn from(error: rocksdb::Error) -> Self { fn from(error: rocksdb::Error) -> Self {

@ -2,32 +2,31 @@
//! //!
//! Its goal is to provide a compliant, safe and fast graph database. //! Its goal is to provide a compliant, safe and fast graph database.
//! //!
//! It currently provides two `Repository` implementation providing [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/) capability: //! It currently provides two `Store` implementation providing [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/) capability:
//! * `MemoryRepository`: a simple in memory implementation. //! * `MemoryStore`: a simple in memory implementation.
//! * `RocksDbRepository`: a file system implementation based on the [RocksDB](https://rocksdb.org/) key-value store. //! * `RocksDbStore`: a file system implementation based on the [RocksDB](https://rocksdb.org/) key-value store.
//! //!
//! Usage example with the `MemoryRepository`: //! Usage example with the `MemoryStore`:
//! //!
//! ``` //! ```
//! use oxigraph::model::*; //! use oxigraph::model::*;
//! use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result}; //! use oxigraph::{MemoryStore, Result};
//! use crate::oxigraph::sparql::{PreparedQuery, QueryOptions}; //! use crate::oxigraph::sparql::{PreparedQuery, QueryOptions};
//! use oxigraph::sparql::QueryResult; //! use oxigraph::sparql::QueryResult;
//! //!
//! let repository = MemoryRepository::default(); //! let store = MemoryStore::default();
//! let mut connection = repository.connection()?;
//! //!
//! // insertion //! // insertion
//! let ex = NamedNode::parse("http://example.com")?; //! let ex = NamedNode::parse("http://example.com")?;
//! let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); //! let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
//! connection.insert(&quad)?; //! store.insert(&quad)?;
//! //!
//! // quad filter //! // quad filter
//! let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect(); //! let results: Result<Vec<Quad>> = store.quads_for_pattern(None, None, None, None).collect();
//! assert_eq!(vec![quad], results?); //! assert_eq!(vec![quad], results?);
//! //!
//! // SPARQL query //! // SPARQL query
//! let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?; //! let prepared_query = store.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?;
//! let results = prepared_query.exec()?; //! let results = prepared_query.exec()?;
//! if let QueryResult::Bindings(results) = results { //! if let QueryResult::Bindings(results) = results {
//! assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into())); //! assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into()));
@ -105,19 +104,18 @@
mod error; mod error;
pub mod model; pub mod model;
mod repository;
pub mod sparql; pub mod sparql;
pub(crate) mod store; pub(crate) mod store;
mod syntax; mod syntax;
pub use error::Error; pub use error::Error;
pub type Result<T> = ::std::result::Result<T, Error>; pub type Result<T> = ::std::result::Result<T, Error>;
pub use crate::repository::Repository; pub use crate::store::MemoryStore;
pub use crate::repository::RepositoryConnection; pub use crate::store::MemoryTransaction;
pub use crate::repository::RepositoryTransaction;
pub use crate::store::MemoryRepository;
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
pub use crate::store::RocksDbRepository; pub use crate::store::RocksDbStore;
#[cfg(feature = "rocksdb")]
pub use crate::store::RocksDbTransaction;
pub use crate::syntax::DatasetSyntax; pub use crate::syntax::DatasetSyntax;
pub use crate::syntax::FileSyntax; pub use crate::syntax::FileSyntax;
pub use crate::syntax::GraphSyntax; pub use crate::syntax::GraphSyntax;

@ -1,305 +0,0 @@
use crate::model::*;
use crate::sparql::{GraphPattern, PreparedQuery, QueryOptions};
use crate::{DatasetSyntax, GraphSyntax, Result};
use std::io::BufRead;
/// A `Repository` stores a [RDF dataset](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset)
/// and allows to query and update it using SPARQL.
///
/// This crate currently provides two implementation of the `Repository` traits:
/// * One in memory: `MemoryRepository`
/// * One disk-based using [RocksDB](https://rocksdb.org/): `RocksDbRepository`
///
/// Usage example with `MemoryRepository`:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result};
/// use crate::oxigraph::sparql::{PreparedQuery, QueryOptions};
/// use oxigraph::sparql::QueryResult;
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection()?;
///
/// // insertion
/// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// connection.insert(&quad);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results?);
///
/// // SPARQL query
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?;
/// let results = prepared_query.exec()?;
/// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into()));
/// }
/// # Result::Ok(())
/// ```
///
/// The implementation based on RocksDB if disabled by default and requires the `"rocksdb"` feature to be activated.
/// A `RocksDbRepository` could be built using `RocksDbRepository::open` and works just like its in-memory equivalent:
/// ```ignore
/// use oxigraph::RocksDbRepository;
/// let dataset = RocksDbRepository::open("example.db")?;
/// ```
///
/// If you want transaction with [ACID](https://en.wikipedia.org/wiki/ACID) properties you could use the `RepositoryConnection.transaction` method.
/// This transaction support is only limited to writes and does not support reads as part of transactions yet.
pub trait Repository {
type Connection: RepositoryConnection;
fn connection(self) -> Result<Self::Connection>;
}
/// A connection to a `Repository`
pub trait RepositoryConnection: Clone {
type Transaction: RepositoryTransaction;
type PreparedQuery: PreparedQuery;
/// Prepares a [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/) and returns an object that could be used to execute it.
///
/// The implementation is a work in progress, SPARQL 1.1 specific features are not implemented yet.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result};
/// use oxigraph::sparql::{PreparedQuery, QueryOptions};
/// use oxigraph::sparql::QueryResult;
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection()?;
///
/// // insertions
/// let ex = NamedNode::parse("http://example.com")?;
/// connection.insert(&Quad::new(ex.clone(), ex.clone(), ex.clone(), None));
///
/// // SPARQL query
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?;
/// let results = prepared_query.exec()?;
/// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into()));
/// }
/// # Result::Ok(())
/// ```
fn prepare_query(&self, query: &str, options: QueryOptions<'_>) -> Result<Self::PreparedQuery>;
/// This is similar to `prepare_query`, but useful if a SPARQL query has already been parsed, which is the case when building `ServiceHandler`s for federated queries with `SERVICE` clauses. For examples, look in the tests.
fn prepare_query_from_pattern(
&self,
graph_pattern: &GraphPattern,
options: QueryOptions<'_>,
) -> Result<Self::PreparedQuery>;
/// Retrieves quads with a filter on each quad component
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result};
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection()?;
///
/// // insertion
/// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// connection.insert(&quad);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results?);
/// # Result::Ok(())
/// ```
#[allow(clippy::option_option)]
fn quads_for_pattern<'a>(
&'a self,
subject: Option<&NamedOrBlankNode>,
predicate: Option<&NamedNode>,
object: Option<&Term>,
graph_name: Option<Option<&NamedOrBlankNode>>,
) -> Box<dyn Iterator<Item = Result<Quad>> + 'a>
where
Self: 'a;
/// Checks if this repository contains a given quad
fn contains(&self, quad: &Quad) -> Result<bool>;
/// Executes a transaction.
///
/// The transaction is executed if the given closure returns `Ok`.
/// Nothing is done if the clusre returns `Err`.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result};
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection()?;
///
/// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
///
/// // transaction
/// connection.transaction(|transaction| {
/// transaction.insert(&quad)
/// });
///
/// // quad filter
/// assert!(connection.contains(&quad).unwrap());
/// # Result::Ok(())
/// ```
fn transaction(&self, f: impl FnOnce(&mut Self::Transaction) -> Result<()>) -> Result<()>;
/// Loads a graph file (i.e. triples) into the repository
///
/// Warning: This functions saves the triples in batch. If the parsing fails in the middle of the file,
/// only a part of it may be written. Use a (memory greedy) transaction if you do not want that.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result, GraphSyntax};
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> .";
/// connection.load_graph(file.as_ref(), GraphSyntax::NTriples, None, None);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), None)], results?);
/// # Result::Ok(())
/// ```
fn load_graph(
&mut self,
reader: impl BufRead,
syntax: GraphSyntax,
to_graph_name: Option<&NamedOrBlankNode>,
base_iri: Option<&str>,
) -> Result<()>;
/// Loads a dataset file (i.e. quads) into the repository.
///
/// Warning: This functions saves the quads in batch. If the parsing fails in the middle of the file,
/// only a part of it may be written. Use a (memory greedy) transaction if you do not want that.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result, DatasetSyntax};
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
/// connection.load_dataset(file.as_ref(), DatasetSyntax::NQuads, None);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), Some(ex.into()))], results?);
/// # Result::Ok(())
/// ```
fn load_dataset(
&mut self,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()>;
/// Adds a quad to this repository.
///
/// If you want to insert a lot of quads at the same time,
/// you should probably use an `auto_transaction`.
///
/// To make a transaction, you could use `transaction`.
fn insert(&mut self, quad: &Quad) -> Result<()>;
/// Removes a quad from this repository.
///
/// If you want to remove a lot of quads at the same time,
/// you should probably use an `auto_transaction`.
///
/// To make a transaction, you could use `transaction`.
fn remove(&mut self, quad: &Quad) -> Result<()>;
}
/// A transaction done on a `RepositoryConnection`
pub trait RepositoryTransaction {
/// Adds quads from a graph file into the transaction insertions.
///
/// Warning: It loads all the files triples into main memory.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result, GraphSyntax};
///
/// let repository = MemoryRepository::default();
/// let connection = repository.connection()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> .";
/// connection.transaction(|transaction|
/// transaction.load_graph(file.as_ref(), GraphSyntax::NTriples, None, None)
/// );
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), None)], results?);
/// # Result::Ok(())
/// ```
fn load_graph(
&mut self,
reader: impl BufRead,
syntax: GraphSyntax,
to_graph_name: Option<&NamedOrBlankNode>,
base_iri: Option<&str>,
) -> Result<()>;
/// Adds quads from a dataset file into the transaction insertions.
///
/// Warning: It loads all the files quads into main memory.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result, DatasetSyntax};
///
/// let repository = MemoryRepository::default();
/// let connection = repository.connection()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
/// connection.transaction(|transaction|
/// transaction.load_dataset(file.as_ref(), DatasetSyntax::NQuads, None)
/// );
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), Some(ex.into()))], results?);
/// # Result::Ok(())
/// ```
fn load_dataset(
&mut self,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()>;
/// Adds a quad insertion to this transaction
fn insert(&mut self, quad: &Quad) -> Result<()>;
/// Adds a quad removals for this transaction
fn remove(&mut self, quad: &Quad) -> Result<()>;
}

@ -35,7 +35,7 @@ pub trait PreparedQuery {
} }
/// An implementation of `PreparedQuery` for internal use /// An implementation of `PreparedQuery` for internal use
pub struct SimplePreparedQuery<S: ReadableEncodedStore>(SimplePreparedQueryAction<S>); pub(crate) struct SimplePreparedQuery<S: ReadableEncodedStore>(SimplePreparedQueryAction<S>);
enum SimplePreparedQueryAction<S: ReadableEncodedStore> { enum SimplePreparedQueryAction<S: ReadableEncodedStore> {
Select { Select {
@ -59,8 +59,8 @@ enum SimplePreparedQueryAction<S: ReadableEncodedStore> {
} }
impl<'a, S: ReadableEncodedStore + 'a> SimplePreparedQuery<S> { impl<'a, S: ReadableEncodedStore + 'a> SimplePreparedQuery<S> {
pub(crate) fn new(connection: S, query: &str, options: QueryOptions<'_>) -> Result<Self> { pub(crate) fn new(store: S, query: &str, options: QueryOptions<'_>) -> Result<Self> {
let dataset = DatasetView::new(connection, options.default_graph_as_union); let dataset = DatasetView::new(store, options.default_graph_as_union);
Ok(Self(match read_sparql_query(query, options.base_iri)? { Ok(Self(match read_sparql_query(query, options.base_iri)? {
QueryVariants::Select { QueryVariants::Select {
algebra, base_iri, .. algebra, base_iri, ..
@ -112,11 +112,11 @@ impl<'a, S: ReadableEncodedStore + 'a> SimplePreparedQuery<S> {
/// Builds `SimplePreparedQuery` from an existing `GraphPattern`. This is used to support federated queries via `SERVICE` clauses /// Builds `SimplePreparedQuery` from an existing `GraphPattern`. This is used to support federated queries via `SERVICE` clauses
pub(crate) fn new_from_pattern( pub(crate) fn new_from_pattern(
connection: S, store: S,
pattern: &GraphPattern, pattern: &GraphPattern,
options: QueryOptions<'_>, options: QueryOptions<'_>,
) -> Result<Self> { ) -> Result<Self> {
let dataset = DatasetView::new(connection, options.default_graph_as_union); let dataset = DatasetView::new(store, options.default_graph_as_union);
let (plan, variables) = PlanBuilder::build(dataset.encoder(), pattern)?; let (plan, variables) = PlanBuilder::build(dataset.encoder(), pattern)?;
let base_iri = if let Some(base_iri) = options.base_iri { let base_iri = if let Some(base_iri) = options.base_iri {
Some(Iri::parse(base_iri.to_string())?) Some(Iri::parse(base_iri.to_string())?)
@ -208,13 +208,12 @@ impl<'a> QueryOptions<'a> {
self self
} }
/// Consider the union of all graphs in the repository as the default graph /// Consider the union of all graphs in the store as the default graph
pub const fn with_default_graph_as_union(mut self) -> Self { pub const fn with_default_graph_as_union(mut self) -> Self {
self.default_graph_as_union = true; self.default_graph_as_union = true;
self self
} }
/// Consider the union of all graphs in the repository as the default graph
pub fn with_service_handler(mut self, service_handler: impl ServiceHandler + 'static) -> Self { pub fn with_service_handler(mut self, service_handler: impl ServiceHandler + 'static) -> Self {
self.service_handler = Box::new(service_handler); self.service_handler = Box::new(service_handler);
self self

@ -1,54 +1,51 @@
use crate::model::*;
use crate::sparql::{PreparedQuery, QueryOptions, SimplePreparedQuery};
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::store::*; use crate::store::*;
use crate::{Repository, Result}; use crate::{DatasetSyntax, GraphSyntax, Result};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::hash::Hash; use std::hash::Hash;
use std::io::BufRead;
use std::iter::{empty, once}; use std::iter::{empty, once};
use std::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
/// Memory based implementation of the `Repository` trait. /// Memory based store.
/// It is cheap to build using the `MemoryRepository::default()` method. /// It encodes a [RDF dataset](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset) and allows to query and update it using SPARQL.
/// It is cheap to build using the `MemoryStore::new()` method.
/// ///
/// Usage example: /// Usage example:
/// ``` /// ```
/// use oxigraph::model::*; /// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result}; /// use oxigraph::{MemoryStore, Result};
/// use crate::oxigraph::sparql::PreparedQuery; /// use oxigraph::sparql::{PreparedQuery, QueryResult, QueryOptions};
/// use oxigraph::sparql::{QueryResult, QueryOptions};
/// ///
/// let repository = MemoryRepository::default(); /// let store = MemoryStore::new();
/// let mut connection = repository.connection()?;
/// ///
/// // insertion /// // insertion
/// let ex = NamedNode::parse("http://example.com")?; /// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); /// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// connection.insert(&quad)?; /// store.insert(&quad)?;
/// ///
/// // quad filter /// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect(); /// let results: Result<Vec<Quad>> = store.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results?); /// assert_eq!(vec![quad], results?);
/// ///
/// // SPARQL query /// // SPARQL query
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?; /// let prepared_query = store.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?;
/// let results = prepared_query.exec()?; /// let results = prepared_query.exec()?;
/// if let QueryResult::Bindings(results) = results { /// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into())); /// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into()));
/// } /// }
/// # Result::Ok(()) /// # Result::Ok(())
/// ``` /// ```
#[derive(Default)] #[derive(Clone)]
pub struct MemoryRepository { pub struct MemoryStore {
inner: MemoryStore, indexes: Arc<RwLock<MemoryStoreIndexes>>,
} }
pub type MemoryRepositoryConnection<'a> = StoreRepositoryConnection<&'a MemoryStore>;
type TripleMap<T> = HashMap<T, HashMap<T, HashSet<T>>>; type TripleMap<T> = HashMap<T, HashMap<T, HashSet<T>>>;
type QuadMap<T> = HashMap<T, TripleMap<T>>; type QuadMap<T> = HashMap<T, TripleMap<T>>;
pub struct MemoryStore {
indexes: RwLock<MemoryStoreIndexes>,
}
#[derive(Default)] #[derive(Default)]
struct MemoryStoreIndexes { struct MemoryStoreIndexes {
spog: QuadMap<EncodedTerm>, spog: QuadMap<EncodedTerm>,
@ -62,208 +59,260 @@ struct MemoryStoreIndexes {
impl Default for MemoryStore { impl Default for MemoryStore {
fn default() -> Self { fn default() -> Self {
let new = Self { Self::new()
indexes: RwLock::default(),
};
let mut transaction = (&new).connection().unwrap().transaction();
transaction.set_first_strings().unwrap();
transaction.commit().unwrap();
new
} }
} }
impl<'a> Repository for &'a MemoryRepository { impl MemoryStore {
type Connection = MemoryRepositoryConnection<'a>; /// Constructs a new `MemoryStore`
pub fn new() -> Self {
fn connection(self) -> Result<StoreRepositoryConnection<&'a MemoryStore>> { let mut new = Self {
Ok(self.inner.connection()?.into()) indexes: Arc::new(RwLock::default()),
};
new.set_first_strings().unwrap();
new
} }
}
impl<'a> Store for &'a MemoryStore {
type Connection = &'a MemoryStore;
fn connection(self) -> Result<Self::Connection> { /// Prepares a [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/) and returns an object that could be used to execute it.
Ok(self) ///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{MemoryStore, Result};
/// use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult};
///
/// let store = MemoryStore::default();
///
/// // insertions
/// let ex = NamedNode::parse("http://example.com")?;
/// store.insert(&Quad::new(ex.clone(), ex.clone(), ex.clone(), None));
///
/// // SPARQL query
/// let prepared_query = store.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?;
/// let results = prepared_query.exec()?;
/// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into()));
/// }
/// # Result::Ok(())
/// ```
pub fn prepare_query(
&self,
query: &str,
options: QueryOptions<'_>,
) -> Result<impl PreparedQuery> {
SimplePreparedQuery::new(self.clone(), query, options)
} }
}
impl<'a> StrLookup for &'a MemoryStore { /// This is similar to `prepare_query`, but useful if a SPARQL query has already been parsed, which is the case when building `ServiceHandler`s for federated queries with `SERVICE` clauses. For examples, look in the tests.
fn get_str(&self, id: StrHash) -> Result<Option<String>> { pub fn prepare_query_from_pattern(
//TODO: avoid copy by adding a lifetime limit to get_str &self,
Ok(self.indexes()?.id2str.get(&id).cloned()) graph_pattern: &GraphPattern,
options: QueryOptions<'_>,
) -> Result<impl PreparedQuery> {
SimplePreparedQuery::new_from_pattern(self.clone(), graph_pattern, options)
}
/// Retrieves quads with a filter on each quad component
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{MemoryStore, Result};
///
/// let store = MemoryStore::default();
///
/// // insertion
/// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// store.insert(&quad);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = store.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results?);
/// # Result::Ok(())
/// ```
#[allow(clippy::option_option)]
pub fn quads_for_pattern<'a>(
&'a self,
subject: Option<&NamedOrBlankNode>,
predicate: Option<&NamedNode>,
object: Option<&Term>,
graph_name: Option<Option<&NamedOrBlankNode>>,
) -> Box<dyn Iterator<Item = Result<Quad>> + 'a>
where
Self: 'a,
{
let subject = subject.map(|s| s.into());
let predicate = predicate.map(|p| p.into());
let object = object.map(|o| o.into());
let graph_name = graph_name.map(|g| g.map_or(ENCODED_DEFAULT_GRAPH, |g| g.into()));
Box::new(
self.encoded_quads_for_pattern(subject, predicate, object, graph_name)
.map(move |quad| self.decode_quad(&quad?)),
)
} }
}
impl<'a> StrContainer for &'a MemoryStore { /// Checks if this store contains a given quad
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> { pub fn contains(&self, quad: &Quad) -> bool {
self.indexes_mut()? let quad = quad.into();
.id2str self.contains_encoded(&quad)
.entry(key) }
.or_insert_with(|| value.to_owned());
Ok(()) /// Executes a transaction.
///
/// The transaction is executed if the given closure returns `Ok`.
/// Nothing is done if the clusre returns `Err`.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{MemoryStore, Result};
///
/// let store = MemoryStore::default();
///
/// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
///
/// // transaction
/// store.transaction(|transaction| {
/// transaction.insert(&quad)
/// });
///
/// // quad filter
/// assert!(store.contains(&quad));
/// # Result::Ok(())
/// ```
pub fn transaction<'a>(
&'a self,
f: impl FnOnce(&mut MemoryTransaction<'a>) -> Result<()>,
) -> Result<()> {
let mut transaction = MemoryTransaction {
store: self,
ops: Vec::new(),
strings: Vec::new(),
};
f(&mut transaction)?;
transaction.commit()
}
/// Loads a graph file (i.e. triples) into the store.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{MemoryStore, Result, GraphSyntax};
///
/// let store = MemoryStore::default();
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> .";
/// store.load_graph(file.as_ref(), GraphSyntax::NTriples, None, None);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = store.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), None)], results?);
/// # Result::Ok(())
/// ```
pub fn load_graph(
&self,
reader: impl BufRead,
syntax: GraphSyntax,
to_graph_name: Option<&NamedOrBlankNode>,
base_iri: Option<&str>,
) -> Result<()> {
let mut store = self;
load_graph(&mut store, reader, syntax, to_graph_name, base_iri)
}
/// Loads a dataset file (i.e. quads) into the store.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{MemoryStore, Result, DatasetSyntax};
///
/// let store = MemoryStore::default();
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
/// store.load_dataset(file.as_ref(), DatasetSyntax::NQuads, None);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = store.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), Some(ex.into()))], results?);
/// # Result::Ok(())
/// ```
pub fn load_dataset(
&self,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()> {
let mut store = self;
load_dataset(&mut store, reader, syntax, base_iri)
} }
}
impl<'a> StoreConnection for &'a MemoryStore { /// Adds a quad to this store.
type Transaction = MemoryTransaction<'a>; pub fn insert(&self, quad: &Quad) -> Result<()> {
type AutoTransaction = &'a MemoryStore; let mut store = self;
let quad = store.encode_quad(quad)?;
store.insert_encoded(&quad)
}
fn transaction(&self) -> MemoryTransaction<'a> { /// Removes a quad from this store.
MemoryTransaction { pub fn remove(&self, quad: &Quad) -> Result<()> {
store: self, let mut store = self;
ops: Vec::default(), let quad = quad.into();
strings: Vec::default(), store.remove_encoded(&quad)
} }
fn indexes(&self) -> RwLockReadGuard<'_, MemoryStoreIndexes> {
self.indexes
.read()
.expect("the Memory store mutex has been poisoned because of a panic")
} }
fn auto_transaction(&self) -> &'a MemoryStore { fn indexes_mut(&self) -> RwLockWriteGuard<'_, MemoryStoreIndexes> {
self self.indexes
.write()
.expect("the Memory store mutex has been poisoned because of a panic")
} }
fn contains(&self, quad: &EncodedQuad) -> Result<bool> { fn contains_encoded(&self, quad: &EncodedQuad) -> bool {
Ok(self self.indexes().spog.get(&quad.subject).map_or(false, |pog| {
.indexes()?
.spog
.get(&quad.subject)
.map_or(false, |pog| {
pog.get(&quad.predicate).map_or(false, |og| { pog.get(&quad.predicate).map_or(false, |og| {
og.get(&quad.object) og.get(&quad.object)
.map_or(false, |g| g.contains(&quad.graph_name)) .map_or(false, |g| g.contains(&quad.graph_name))
}) })
})) })
}
fn quads_for_pattern<'b>(
&'b self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'b> {
match subject {
Some(subject) => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => {
let quad = EncodedQuad::new(subject, predicate, object, graph_name);
match self.contains(&quad) {
Ok(true) => Box::new(once(Ok(quad))),
Ok(false) => Box::new(empty()),
Err(error) => Box::new(once(Err(error))),
}
}
None => wrap_error(
self.quads_for_subject_predicate_object(subject, predicate, object),
),
},
None => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_predicate_graph(subject, predicate, graph_name),
),
None => wrap_error(self.quads_for_subject_predicate(subject, predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_object_graph(subject, object, graph_name),
),
None => wrap_error(self.quads_for_subject_object(subject, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_subject_graph(subject, graph_name))
}
None => wrap_error(self.quads_for_subject(subject)),
},
},
},
None => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_predicate_object_graph(predicate, object, graph_name),
),
None => wrap_error(self.quads_for_predicate_object(predicate, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_predicate_graph(predicate, graph_name))
}
None => wrap_error(self.quads_for_predicate(predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_object_graph(object, graph_name))
}
None => wrap_error(self.quads_for_object(object)),
},
None => match graph_name {
Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)),
None => wrap_error(self.quads()),
},
},
},
}
}
}
impl<'a> StoreTransaction for &'a MemoryStore {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
self.indexes_mut()?.insert_quad(quad);
Ok(())
}
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> {
self.indexes_mut()?.remove_quad(quad);
Ok(())
}
fn commit(self) -> Result<()> {
Ok(())
}
}
impl MemoryStore {
fn indexes(&self) -> Result<RwLockReadGuard<'_, MemoryStoreIndexes>> {
Ok(self.indexes.read()?)
}
fn indexes_mut(&self) -> Result<RwLockWriteGuard<'_, MemoryStoreIndexes>> {
Ok(self.indexes.write()?)
} }
fn quads<'a>(&'a self) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { fn encoded_quads<'a>(&'a self) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
Ok(quad_map_flatten(&self.indexes()?.gspo) Ok(quad_map_flatten(&self.indexes().gspo)
.map(|(g, s, p, o)| Ok(EncodedQuad::new(s, p, o, g))) .map(|(g, s, p, o)| Ok(EncodedQuad::new(s, p, o, g)))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter()) .into_iter())
} }
fn quads_for_subject( fn encoded_quads_for_subject(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok( Ok(option_triple_map_flatten(self.indexes().spog.get(&subject))
option_triple_map_flatten(self.indexes()?.spog.get(&subject))
.map(|(p, o, g)| Ok(EncodedQuad::new(subject, p, o, g))) .map(|(p, o, g)| Ok(EncodedQuad::new(subject, p, o, g)))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter(), .into_iter())
)
} }
fn quads_for_subject_predicate( fn encoded_quads_for_subject_predicate(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.indexes()? self.indexes()
.spog .spog
.get(&subject) .get(&subject)
.and_then(|pog| pog.get(&predicate)), .and_then(|pog| pog.get(&predicate)),
@ -273,14 +322,14 @@ impl MemoryStore {
.into_iter()) .into_iter())
} }
fn quads_for_subject_predicate_object( fn encoded_quads_for_subject_predicate_object(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_set_flatten( Ok(option_set_flatten(
self.indexes()? self.indexes()
.spog .spog
.get(&subject) .get(&subject)
.and_then(|pog| pog.get(&predicate)) .and_then(|pog| pog.get(&predicate))
@ -291,13 +340,13 @@ impl MemoryStore {
.into_iter()) .into_iter())
} }
fn quads_for_subject_object( fn encoded_quads_for_subject_object(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.indexes()? self.indexes()
.ospg .ospg
.get(&object) .get(&object)
.and_then(|spg| spg.get(&subject)), .and_then(|spg| spg.get(&subject)),
@ -307,25 +356,25 @@ impl MemoryStore {
.into_iter()) .into_iter())
} }
fn quads_for_predicate( fn encoded_quads_for_predicate(
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok( Ok(
option_triple_map_flatten(self.indexes()?.posg.get(&predicate)) option_triple_map_flatten(self.indexes().posg.get(&predicate))
.map(|(o, s, g)| Ok(EncodedQuad::new(s, predicate, o, g))) .map(|(o, s, g)| Ok(EncodedQuad::new(s, predicate, o, g)))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter(), .into_iter(),
) )
} }
fn quads_for_predicate_object( fn encoded_quads_for_predicate_object(
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.indexes()? self.indexes()
.posg .posg
.get(&predicate) .get(&predicate)
.and_then(|osg| osg.get(&object)), .and_then(|osg| osg.get(&object)),
@ -335,35 +384,35 @@ impl MemoryStore {
.into_iter()) .into_iter())
} }
fn quads_for_object( fn encoded_quads_for_object(
&self, &self,
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_triple_map_flatten(self.indexes()?.ospg.get(&object)) Ok(option_triple_map_flatten(self.indexes().ospg.get(&object))
.map(|(s, p, g)| Ok(EncodedQuad::new(s, p, object, g))) .map(|(s, p, g)| Ok(EncodedQuad::new(s, p, object, g)))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter()) .into_iter())
} }
fn quads_for_graph( fn encoded_quads_for_graph(
&self, &self,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok( Ok(
option_triple_map_flatten(self.indexes()?.gspo.get(&graph_name)) option_triple_map_flatten(self.indexes().gspo.get(&graph_name))
.map(|(s, p, o)| Ok(EncodedQuad::new(s, p, o, graph_name))) .map(|(s, p, o)| Ok(EncodedQuad::new(s, p, o, graph_name)))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.into_iter(), .into_iter(),
) )
} }
fn quads_for_subject_graph( fn encoded_quads_for_subject_graph(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.indexes()? self.indexes()
.gspo .gspo
.get(&graph_name) .get(&graph_name)
.and_then(|spo| spo.get(&subject)), .and_then(|spo| spo.get(&subject)),
@ -373,14 +422,14 @@ impl MemoryStore {
.into_iter()) .into_iter())
} }
fn quads_for_subject_predicate_graph( fn encoded_quads_for_subject_predicate_graph(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_set_flatten( Ok(option_set_flatten(
self.indexes()? self.indexes()
.gspo .gspo
.get(&graph_name) .get(&graph_name)
.and_then(|spo| spo.get(&subject)) .and_then(|spo| spo.get(&subject))
@ -391,14 +440,14 @@ impl MemoryStore {
.into_iter()) .into_iter())
} }
fn quads_for_subject_object_graph( fn encoded_quads_for_subject_object_graph(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_set_flatten( Ok(option_set_flatten(
self.indexes()? self.indexes()
.gosp .gosp
.get(&graph_name) .get(&graph_name)
.and_then(|osp| osp.get(&object)) .and_then(|osp| osp.get(&object))
@ -409,13 +458,13 @@ impl MemoryStore {
.into_iter()) .into_iter())
} }
fn quads_for_predicate_graph( fn encoded_quads_for_predicate_graph(
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.indexes()? self.indexes()
.gpos .gpos
.get(&graph_name) .get(&graph_name)
.and_then(|pos| pos.get(&predicate)), .and_then(|pos| pos.get(&predicate)),
@ -425,14 +474,14 @@ impl MemoryStore {
.into_iter()) .into_iter())
} }
fn quads_for_predicate_object_graph( fn encoded_quads_for_predicate_object_graph(
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_set_flatten( Ok(option_set_flatten(
self.indexes()? self.indexes()
.gpos .gpos
.get(&graph_name) .get(&graph_name)
.and_then(|pos| pos.get(&predicate)) .and_then(|pos| pos.get(&predicate))
@ -443,13 +492,13 @@ impl MemoryStore {
.into_iter()) .into_iter())
} }
fn quads_for_object_graph( fn encoded_quads_for_object_graph(
&self, &self,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>>> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>>> {
Ok(option_pair_map_flatten( Ok(option_pair_map_flatten(
self.indexes()? self.indexes()
.gosp .gosp
.get(&graph_name) .get(&graph_name)
.and_then(|osp| osp.get(&object)), .and_then(|osp| osp.get(&object)),
@ -460,8 +509,149 @@ impl MemoryStore {
} }
} }
impl MemoryStoreIndexes { impl StrLookup for MemoryStore {
fn insert_quad(&mut self, quad: &EncodedQuad) { fn get_str(&self, id: StrHash) -> Result<Option<String>> {
//TODO: avoid copy by adding a lifetime limit to get_str
self.indexes().get_str(id)
}
}
impl StrLookup for MemoryStoreIndexes {
fn get_str(&self, id: StrHash) -> Result<Option<String>> {
//TODO: avoid copy by adding a lifetime limit to get_str
Ok(self.id2str.get(&id).cloned())
}
}
impl StrContainer for MemoryStore {
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> {
self.indexes_mut().insert_str(key, value)
}
}
impl<'a> StrContainer for &'a MemoryStore {
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> {
self.indexes_mut().insert_str(key, value)
}
}
impl StrContainer for MemoryStoreIndexes {
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> {
self.id2str.entry(key).or_insert_with(|| value.to_owned());
Ok(())
}
}
impl<'a> ReadableEncodedStore for MemoryStore {
fn encoded_quads_for_pattern<'b>(
&'b self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'b> {
match subject {
Some(subject) => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => {
let quad = EncodedQuad::new(subject, predicate, object, graph_name);
if self.contains_encoded(&quad) {
Box::new(once(Ok(quad)))
} else {
Box::new(empty())
}
}
None => wrap_error(self.encoded_quads_for_subject_predicate_object(
subject, predicate, object,
)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.encoded_quads_for_subject_predicate_graph(
subject, predicate, graph_name,
))
}
None => {
wrap_error(self.encoded_quads_for_subject_predicate(subject, predicate))
}
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => {
wrap_error(self.encoded_quads_for_subject_object_graph(
subject, object, graph_name,
))
}
None => wrap_error(self.encoded_quads_for_subject_object(subject, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.encoded_quads_for_subject_graph(subject, graph_name))
}
None => wrap_error(self.encoded_quads_for_subject(subject)),
},
},
},
None => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => {
wrap_error(self.encoded_quads_for_predicate_object_graph(
predicate, object, graph_name,
))
}
None => {
wrap_error(self.encoded_quads_for_predicate_object(predicate, object))
}
},
None => match graph_name {
Some(graph_name) => wrap_error(
self.encoded_quads_for_predicate_graph(predicate, graph_name),
),
None => wrap_error(self.encoded_quads_for_predicate(predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => {
wrap_error(self.encoded_quads_for_object_graph(object, graph_name))
}
None => wrap_error(self.encoded_quads_for_object(object)),
},
None => match graph_name {
Some(graph_name) => wrap_error(self.encoded_quads_for_graph(graph_name)),
None => wrap_error(self.encoded_quads()),
},
},
},
}
}
}
impl WritableEncodedStore for MemoryStore {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.indexes_mut().insert_encoded(quad)
}
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.indexes_mut().remove_encoded(quad)
}
}
impl<'a> WritableEncodedStore for &'a MemoryStore {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.indexes_mut().insert_encoded(quad)
}
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.indexes_mut().remove_encoded(quad)
}
}
impl WritableEncodedStore for MemoryStoreIndexes {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
insert_into_quad_map( insert_into_quad_map(
&mut self.gosp, &mut self.gosp,
quad.graph_name, quad.graph_name,
@ -504,9 +694,10 @@ impl MemoryStoreIndexes {
quad.object, quad.object,
quad.graph_name, quad.graph_name,
); );
Ok(())
} }
fn remove_quad(&mut self, quad: &EncodedQuad) { fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
remove_from_quad_map( remove_from_quad_map(
&mut self.gosp, &mut self.gosp,
&quad.graph_name, &quad.graph_name,
@ -549,6 +740,7 @@ impl MemoryStoreIndexes {
&quad.object, &quad.object,
&quad.graph_name, &quad.graph_name,
); );
Ok(())
} }
} }
@ -638,6 +830,7 @@ fn quad_map_flatten<'a, T: Copy>(gspo: &'a QuadMap<T>) -> impl Iterator<Item = (
}) })
} }
/// Allows to insert and delete quads during a transaction with the `MemoryStore`.
pub struct MemoryTransaction<'a> { pub struct MemoryTransaction<'a> {
store: &'a MemoryStore, store: &'a MemoryStore,
ops: Vec<TransactionOp>, ops: Vec<TransactionOp>,
@ -649,6 +842,91 @@ enum TransactionOp {
Delete(EncodedQuad), Delete(EncodedQuad),
} }
impl<'a> MemoryTransaction<'a> {
/// Loads a graph file (i.e. triples) into the store during the transaction.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{MemoryStore, Result, GraphSyntax};
///
/// let store = MemoryStore::default();
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> .";
/// store.transaction(|transaction| {
/// store.load_graph(file.as_ref(), GraphSyntax::NTriples, None, None)
/// })?;
///
/// // quad filter
/// let results: Result<Vec<Quad>> = store.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), None)], results?);
/// # Result::Ok(())
/// ```
pub fn load_graph(
&mut self,
reader: impl BufRead,
syntax: GraphSyntax,
to_graph_name: Option<&NamedOrBlankNode>,
base_iri: Option<&str>,
) -> Result<()> {
load_graph(self, reader, syntax, to_graph_name, base_iri)
}
/// Loads a dataset file (i.e. quads) into the store during the transaction.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{MemoryStore, Result, DatasetSyntax};
///
/// let store = MemoryStore::default();
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
/// store.load_dataset(file.as_ref(), DatasetSyntax::NQuads, None);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = store.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), Some(ex.into()))], results?);
/// # Result::Ok(())
/// ```
pub fn load_dataset(
&mut self,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()> {
load_dataset(self, reader, syntax, base_iri)
}
/// Adds a quad to this store during the transaction.
pub fn insert(&mut self, quad: &Quad) -> Result<()> {
let quad = self.encode_quad(quad)?;
self.insert_encoded(&quad)
}
/// Removes a quad from this store during the transaction.
pub fn remove(&mut self, quad: &Quad) -> Result<()> {
let quad = quad.into();
self.remove_encoded(&quad)
}
fn commit(self) -> Result<()> {
let mut indexes = self.store.indexes_mut();
indexes.id2str.extend(self.strings);
for op in self.ops {
match op {
TransactionOp::Insert(quad) => indexes.insert_encoded(&quad)?,
TransactionOp::Delete(quad) => indexes.remove_encoded(&quad)?,
}
}
Ok(())
}
}
impl StrContainer for MemoryTransaction<'_> { impl StrContainer for MemoryTransaction<'_> {
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> { fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> {
self.strings.push((key, value.to_owned())); self.strings.push((key, value.to_owned()));
@ -656,26 +934,14 @@ impl StrContainer for MemoryTransaction<'_> {
} }
} }
impl StoreTransaction for MemoryTransaction<'_> { impl WritableEncodedStore for MemoryTransaction<'_> {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.ops.push(TransactionOp::Insert(*quad)); self.ops.push(TransactionOp::Insert(*quad));
Ok(()) Ok(())
} }
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.ops.push(TransactionOp::Delete(*quad)); self.ops.push(TransactionOp::Delete(*quad));
Ok(()) Ok(())
} }
fn commit(self) -> Result<()> {
let mut indexes = self.store.indexes_mut()?;
indexes.id2str.extend(self.strings);
for op in self.ops {
match op {
TransactionOp::Insert(quad) => indexes.insert_quad(&quad),
TransactionOp::Delete(quad) => indexes.remove_quad(&quad),
}
}
Ok(())
}
} }

@ -1,20 +1,21 @@
//! Provides implementations of the `oxigraph::Repository` trait. //! Store implementations
mod memory; mod memory;
pub(crate) mod numeric_encoder; pub(crate) mod numeric_encoder;
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
mod rocksdb; mod rocksdb;
pub use crate::sparql::GraphPattern; use crate::sparql::GraphPattern;
pub use crate::store::memory::MemoryRepository; pub use crate::store::memory::MemoryStore;
pub use crate::store::memory::MemoryTransaction;
#[cfg(feature = "rocksdb")] #[cfg(feature = "rocksdb")]
pub use crate::store::rocksdb::RocksDbRepository; pub use crate::store::rocksdb::RocksDbStore;
#[cfg(feature = "rocksdb")]
pub use crate::store::rocksdb::RocksDbTransaction;
use crate::model::*; use crate::model::*;
use crate::repository::RepositoryTransaction;
use crate::sparql::{QueryOptions, SimplePreparedQuery};
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::{DatasetSyntax, Error, GraphSyntax, RepositoryConnection, Result}; use crate::{DatasetSyntax, Error, GraphSyntax, Result};
use rio_api::parser::{QuadsParser, TriplesParser}; use rio_api::parser::{QuadsParser, TriplesParser};
use rio_turtle::{NQuadsParser, NTriplesParser, TriGParser, TurtleParser}; use rio_turtle::{NQuadsParser, NTriplesParser, TriGParser, TurtleParser};
use rio_xml::RdfXmlParser; use rio_xml::RdfXmlParser;
@ -22,42 +23,6 @@ use std::collections::HashMap;
use std::io::BufRead; use std::io::BufRead;
use std::iter::Iterator; use std::iter::Iterator;
/// Defines the `Store` traits that is used to have efficient binary storage
pub trait Store {
type Connection: StoreConnection;
fn connection(self) -> Result<Self::Connection>;
}
/// A connection to a `Store`
pub trait StoreConnection: StrLookup + Sized + Clone {
type Transaction: StoreTransaction;
type AutoTransaction: StoreTransaction;
fn transaction(&self) -> Self::Transaction;
fn auto_transaction(&self) -> Self::AutoTransaction;
fn contains(&self, quad: &EncodedQuad) -> Result<bool>;
fn quads_for_pattern<'a>(
&'a self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a>;
}
/// A transaction
pub trait StoreTransaction: StrContainer + Sized {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()>;
fn remove(&mut self, quad: &EncodedQuad) -> Result<()>;
fn commit(self) -> Result<()>;
}
pub trait ReadableEncodedStore: StrLookup + Sized { pub trait ReadableEncodedStore: StrLookup + Sized {
fn encoded_quads_for_pattern<'a>( fn encoded_quads_for_pattern<'a>(
&'a self, &'a self,
@ -68,210 +33,76 @@ pub trait ReadableEncodedStore: StrLookup + Sized {
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a>; ) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a>;
} }
impl<S: StoreConnection> ReadableEncodedStore for S { pub trait WritableEncodedStore: StrContainer + Sized {
fn encoded_quads_for_pattern<'a>( fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()>;
&'a self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a> {
self.quads_for_pattern(subject, predicate, object, graph_name)
}
}
/// A `RepositoryConnection` from a `StoreConnection` fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()>;
#[derive(Clone)]
pub struct StoreRepositoryConnection<S: StoreConnection> {
inner: S,
}
impl<S: StoreConnection> From<S> for StoreRepositoryConnection<S> {
fn from(inner: S) -> Self {
Self { inner }
}
}
impl<S: StoreConnection> StoreRepositoryConnection<S> {
#[must_use]
fn auto_transaction(&self) -> StoreRepositoryTransaction<S::AutoTransaction> {
StoreRepositoryTransaction {
inner: self.inner.auto_transaction(),
}
}
} }
impl<S: StoreConnection> RepositoryConnection for StoreRepositoryConnection<S> { fn load_graph<S: WritableEncodedStore>(
type Transaction = StoreRepositoryTransaction<S::Transaction>; store: &mut S,
type PreparedQuery = SimplePreparedQuery<S>;
fn prepare_query(
&self,
query: &str,
options: QueryOptions<'_>,
) -> Result<SimplePreparedQuery<S>> {
SimplePreparedQuery::new(self.inner.clone(), query, options) //TODO: avoid clone
}
fn prepare_query_from_pattern(
&self,
pattern: &GraphPattern,
options: QueryOptions<'_>,
) -> Result<Self::PreparedQuery> {
SimplePreparedQuery::new_from_pattern(self.inner.clone(), pattern, options)
//TODO: avoid clone
}
fn quads_for_pattern<'a>(
&'a self,
subject: Option<&NamedOrBlankNode>,
predicate: Option<&NamedNode>,
object: Option<&Term>,
graph_name: Option<Option<&NamedOrBlankNode>>,
) -> Box<dyn Iterator<Item = Result<Quad>> + 'a>
where
Self: 'a,
{
let subject = subject.map(|s| s.into());
let predicate = predicate.map(|p| p.into());
let object = object.map(|o| o.into());
let graph_name = graph_name.map(|g| g.map_or(ENCODED_DEFAULT_GRAPH, |g| g.into()));
Box::new(
self.inner
.quads_for_pattern(subject, predicate, object, graph_name)
.map(move |quad| self.inner.decode_quad(&quad?)),
)
}
fn contains(&self, quad: &Quad) -> Result<bool> {
self.inner.contains(&quad.into())
}
fn transaction(&self, f: impl FnOnce(&mut Self::Transaction) -> Result<()>) -> Result<()> {
let mut transaction = StoreRepositoryTransaction {
inner: self.inner.transaction(),
};
f(&mut transaction)?;
transaction.inner.commit()
}
fn load_graph(
&mut self,
reader: impl BufRead,
syntax: GraphSyntax,
to_graph_name: Option<&NamedOrBlankNode>,
base_iri: Option<&str>,
) -> Result<()> {
let mut transaction = self.auto_transaction();
transaction.load_graph(reader, syntax, to_graph_name, base_iri)?;
transaction.inner.commit()
}
fn load_dataset(
&mut self,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()> {
let mut transaction = self.auto_transaction();
transaction.load_dataset(reader, syntax, base_iri)?;
transaction.inner.commit()
}
fn insert(&mut self, quad: &Quad) -> Result<()> {
let mut transaction = self.auto_transaction();
transaction.insert(quad)?;
transaction.inner.commit()
}
fn remove(&mut self, quad: &Quad) -> Result<()> {
let mut transaction = self.auto_transaction();
transaction.remove(quad)?;
transaction.inner.commit()
}
}
pub struct StoreRepositoryTransaction<T: StoreTransaction> {
inner: T,
}
impl<T: StoreTransaction> RepositoryTransaction for StoreRepositoryTransaction<T> {
fn load_graph(
&mut self,
reader: impl BufRead, reader: impl BufRead,
syntax: GraphSyntax, syntax: GraphSyntax,
to_graph_name: Option<&NamedOrBlankNode>, to_graph_name: Option<&NamedOrBlankNode>,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<()> { ) -> Result<()> {
let base_iri = base_iri.unwrap_or(""); let base_iri = base_iri.unwrap_or("");
match syntax { match syntax {
GraphSyntax::NTriples => { GraphSyntax::NTriples => {
self.load_from_triple_parser(NTriplesParser::new(reader)?, to_graph_name) load_from_triple_parser(store, NTriplesParser::new(reader)?, to_graph_name)
} }
GraphSyntax::Turtle => { GraphSyntax::Turtle => {
self.load_from_triple_parser(TurtleParser::new(reader, base_iri)?, to_graph_name) load_from_triple_parser(store, TurtleParser::new(reader, base_iri)?, to_graph_name)
} }
GraphSyntax::RdfXml => { GraphSyntax::RdfXml => {
self.load_from_triple_parser(RdfXmlParser::new(reader, base_iri)?, to_graph_name) load_from_triple_parser(store, RdfXmlParser::new(reader, base_iri)?, to_graph_name)
}
} }
} }
fn load_dataset(
&mut self,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()> {
let base_iri = base_iri.unwrap_or("");
match syntax {
DatasetSyntax::NQuads => self.load_from_quad_parser(NQuadsParser::new(reader)?),
DatasetSyntax::TriG => self.load_from_quad_parser(TriGParser::new(reader, base_iri)?),
}
}
fn insert(&mut self, quad: &Quad) -> Result<()> {
let quad = self.inner.encode_quad(quad)?;
self.inner.insert(&quad)
}
fn remove(&mut self, quad: &Quad) -> Result<()> {
let quad = quad.into();
self.inner.remove(&quad)
}
} }
impl<T: StoreTransaction> StoreRepositoryTransaction<T> { fn load_from_triple_parser<S: WritableEncodedStore, P: TriplesParser>(
fn load_from_triple_parser<P: TriplesParser>( store: &mut S,
&mut self,
mut parser: P, mut parser: P,
to_graph_name: Option<&NamedOrBlankNode>, to_graph_name: Option<&NamedOrBlankNode>,
) -> Result<()> ) -> Result<()>
where where
Error: From<P::Error>, Error: From<P::Error>,
{ {
let mut bnode_map = HashMap::default(); let mut bnode_map = HashMap::default();
let graph_name = if let Some(graph_name) = to_graph_name { let graph_name = if let Some(graph_name) = to_graph_name {
self.inner.encode_named_or_blank_node(graph_name)? store.encode_named_or_blank_node(graph_name)?
} else { } else {
EncodedTerm::DefaultGraph EncodedTerm::DefaultGraph
}; };
parser.parse_all(&mut move |t| { parser.parse_all(&mut move |t| {
let quad = self let quad = store.encode_rio_triple_in_graph(t, graph_name, &mut bnode_map)?;
.inner store.insert_encoded(&quad)
.encode_rio_triple_in_graph(t, graph_name, &mut bnode_map)?;
self.inner.insert(&quad)
}) })
}
fn load_dataset<S: WritableEncodedStore>(
store: &mut S,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()> {
let base_iri = base_iri.unwrap_or("");
match syntax {
DatasetSyntax::NQuads => load_from_quad_parser(store, NQuadsParser::new(reader)?),
DatasetSyntax::TriG => load_from_quad_parser(store, TriGParser::new(reader, base_iri)?),
} }
}
fn load_from_quad_parser<P: QuadsParser>(&mut self, mut parser: P) -> Result<()> fn load_from_quad_parser<S: WritableEncodedStore, P: QuadsParser>(
where store: &mut S,
mut parser: P,
) -> Result<()>
where
Error: From<P::Error>, Error: From<P::Error>,
{ {
let mut bnode_map = HashMap::default(); let mut bnode_map = HashMap::default();
parser.parse_all(&mut move |q| { parser.parse_all(&mut move |q| {
let quad = self.inner.encode_rio_quad(q, &mut bnode_map)?; let quad = store.encode_rio_quad(q, &mut bnode_map)?;
self.inner.insert(&quad) store.insert_encoded(&quad)
}) })
}
} }

@ -1264,3 +1264,12 @@ fn test_encoding() {
assert_eq!(encoded, EncodedTerm::from(&term)); assert_eq!(encoded, EncodedTerm::from(&term));
} }
} }
#[test]
fn test_str_hash() {
assert_eq!(StrHash::new(""), EMPTY_STRING_ID);
assert_eq!(
StrHash::new("http://www.w3.org/1999/02/22-rdf-syntax-ns#langString"),
RDF_LANG_STRING_ID
);
}

@ -1,41 +1,42 @@
use crate::model::*;
use crate::sparql::{GraphPattern, PreparedQuery, QueryOptions, SimplePreparedQuery};
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::store::{Store, StoreConnection, StoreRepositoryConnection, StoreTransaction}; use crate::store::{load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore};
use crate::Error; use crate::{DatasetSyntax, Error, GraphSyntax, Result};
use crate::{Repository, Result};
use rocksdb::*; use rocksdb::*;
use std::io::Cursor; use std::io::{BufRead, Cursor};
use std::iter::{empty, once}; use std::iter::{empty, once};
use std::mem::take; use std::mem::take;
use std::path::Path; use std::path::Path;
use std::str; use std::str;
use std::sync::Arc;
/// `Repository` implementation based on the [RocksDB](https://rocksdb.org/) key-value store /// Store based on the [RocksDB](https://rocksdb.org/) key-value database.
/// It encodes a [RDF dataset](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset) and allows to query and update it using SPARQL.
/// ///
/// To use it, the `"rocksdb"` feature needs to be activated. /// To use it, the `"rocksdb"` feature needs to be activated.
/// ///
/// Usage example: /// Usage example:
/// ``` /// ```
/// use oxigraph::model::*; /// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, RocksDbRepository, Result}; /// use oxigraph::{Result, RocksDbStore};
/// use crate::oxigraph::sparql::{PreparedQuery, QueryOptions}; /// use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult};
/// use oxigraph::sparql::QueryResult;
/// # use std::fs::remove_dir_all; /// # use std::fs::remove_dir_all;
/// ///
/// # { /// # {
/// let repository = RocksDbRepository::open("example.db")?; /// let store = RocksDbStore::open("example.db")?;
/// let mut connection = repository.connection()?;
/// ///
/// // insertion /// // insertion
/// let ex = NamedNode::parse("http://example.com")?; /// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); /// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// connection.insert(&quad)?; /// store.insert(&quad)?;
/// ///
/// // quad filter /// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect(); /// let results: Result<Vec<Quad>> = store.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results?); /// assert_eq!(vec![quad], results?);
/// ///
/// // SPARQL query /// // SPARQL query
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?; /// let prepared_query = store.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?;
/// let results = prepared_query.exec()?; /// let results = prepared_query.exec()?;
/// if let QueryResult::Bindings(results) = results { /// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into())); /// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into()));
@ -45,12 +46,11 @@ use std::str;
/// # remove_dir_all("example.db")?; /// # remove_dir_all("example.db")?;
/// # Result::Ok(()) /// # Result::Ok(())
/// ``` /// ```
pub struct RocksDbRepository { #[derive(Clone)]
inner: RocksDbStore, pub struct RocksDbStore {
db: Arc<DB>,
} }
pub type RocksDbRepositoryConnection<'a> = StoreRepositoryConnection<RocksDbStoreConnection<'a>>;
const ID2STR_CF: &str = "id2str"; const ID2STR_CF: &str = "id2str";
const SPOG_CF: &str = "spog"; const SPOG_CF: &str = "spog";
const POSG_CF: &str = "posg"; const POSG_CF: &str = "posg";
@ -67,13 +67,9 @@ const COLUMN_FAMILIES: [&str; 7] = [
const MAX_TRANSACTION_SIZE: usize = 1024; const MAX_TRANSACTION_SIZE: usize = 1024;
struct RocksDbStore {
db: DB,
}
#[derive(Clone)] #[derive(Clone)]
pub struct RocksDbStoreConnection<'a> { struct RocksDbStoreHandle<'a> {
store: &'a RocksDbStore, db: &'a DB,
id2str_cf: &'a ColumnFamily, id2str_cf: &'a ColumnFamily,
spog_cf: &'a ColumnFamily, spog_cf: &'a ColumnFamily,
posg_cf: &'a ColumnFamily, posg_cf: &'a ColumnFamily,
@ -83,47 +79,144 @@ pub struct RocksDbStoreConnection<'a> {
gosp_cf: &'a ColumnFamily, gosp_cf: &'a ColumnFamily,
} }
impl RocksDbRepository {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {
inner: RocksDbStore::open(path)?,
})
}
}
impl<'a> Repository for &'a RocksDbRepository {
type Connection = RocksDbRepositoryConnection<'a>;
fn connection(self) -> Result<StoreRepositoryConnection<RocksDbStoreConnection<'a>>> {
Ok(self.inner.connection()?.into())
}
}
impl RocksDbStore { impl RocksDbStore {
fn open(path: impl AsRef<Path>) -> Result<Self> { /// Opens a `RocksDbStore`
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
let mut options = Options::default(); let mut options = Options::default();
options.create_if_missing(true); options.create_if_missing(true);
options.create_missing_column_families(true); options.create_missing_column_families(true);
options.set_compaction_style(DBCompactionStyle::Universal); options.set_compaction_style(DBCompactionStyle::Universal);
let new = Self { let new = Self {
db: DB::open_cf(&options, path, &COLUMN_FAMILIES)?, db: Arc::new(DB::open_cf(&options, path, &COLUMN_FAMILIES)?),
}; };
let mut transaction = (&new).connection()?.transaction(); let mut transaction = new.handle()?.auto_transaction();
transaction.set_first_strings()?; transaction.set_first_strings()?;
transaction.commit()?; transaction.commit()?;
Ok(new) Ok(new)
} }
}
impl<'a> Store for &'a RocksDbStore { /// Prepares a [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/) and returns an object that could be used to execute it.
type Connection = RocksDbStoreConnection<'a>; ///
/// See `MemoryStore` for a usage example.
fn connection(self) -> Result<RocksDbStoreConnection<'a>> { pub fn prepare_query<'a>(
Ok(RocksDbStoreConnection { &'a self,
store: self, query: &str,
options: QueryOptions<'_>,
) -> Result<impl PreparedQuery + 'a> {
SimplePreparedQuery::new((*self).clone(), query, options)
}
/// This is similar to `prepare_query`, but useful if a SPARQL query has already been parsed, which is the case when building `ServiceHandler`s for federated queries with `SERVICE` clauses. For examples, look in the tests.
pub fn prepare_query_from_pattern<'a>(
&'a self,
graph_pattern: &GraphPattern,
options: QueryOptions<'_>,
) -> Result<impl PreparedQuery + 'a> {
SimplePreparedQuery::new_from_pattern((*self).clone(), graph_pattern, options)
}
/// Retrieves quads with a filter on each quad component
///
/// See `MemoryStore` for a usage example.
#[allow(clippy::option_option)]
pub fn quads_for_pattern<'a>(
&'a self,
subject: Option<&NamedOrBlankNode>,
predicate: Option<&NamedNode>,
object: Option<&Term>,
graph_name: Option<Option<&NamedOrBlankNode>>,
) -> Box<dyn Iterator<Item = Result<Quad>> + 'a>
where
Self: 'a,
{
let subject = subject.map(|s| s.into());
let predicate = predicate.map(|p| p.into());
let object = object.map(|o| o.into());
let graph_name = graph_name.map(|g| g.map_or(ENCODED_DEFAULT_GRAPH, |g| g.into()));
Box::new(
self.encoded_quads_for_pattern(subject, predicate, object, graph_name)
.map(move |quad| self.decode_quad(&quad?)),
)
}
/// Checks if this store contains a given quad
pub fn contains(&self, quad: &Quad) -> Result<bool> {
let quad = quad.into();
self.handle()?.contains(&quad)
}
/// Executes a transaction.
///
/// The transaction is executed if the given closure returns `Ok`.
/// Nothing is done if the clusre returns `Err`.
///
/// See `MemoryStore` for a usage example.
pub fn transaction<'a>(
&'a self,
f: impl FnOnce(&mut RocksDbTransaction<'a>) -> Result<()>,
) -> Result<()> {
let mut transaction = self.handle()?.transaction();
f(&mut transaction)?;
transaction.commit()
}
/// Loads a graph file (i.e. triples) into the store
///
/// Warning: This functions saves the triples in batch. If the parsing fails in the middle of the file,
/// only a part of it may be written. Use a (memory greedy) transaction if you do not want that.
///
/// See `MemoryStore` for a usage example.
pub fn load_graph(
&self,
reader: impl BufRead,
syntax: GraphSyntax,
to_graph_name: Option<&NamedOrBlankNode>,
base_iri: Option<&str>,
) -> Result<()> {
let mut transaction = self.handle()?.auto_transaction();
load_graph(&mut transaction, reader, syntax, to_graph_name, base_iri)?;
transaction.commit()
}
/// Loads a dataset file (i.e. quads) into the store.
///
/// Warning: This functions saves the quads in batch. If the parsing fails in the middle of the file,
/// only a part of it may be written. Use a (memory greedy) transaction if you do not want that.
///
/// See `MemoryStore` for a usage example.
pub fn load_dataset(
&self,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()> {
let mut transaction = self.handle()?.auto_transaction();
load_dataset(&mut transaction, reader, syntax, base_iri)?;
transaction.commit()
}
/// Adds a quad to this store.
pub fn insert(&self, quad: &Quad) -> Result<()> {
let mut transaction = self.handle()?.auto_transaction();
let quad = transaction.encode_quad(quad)?;
transaction.insert_encoded(&quad)?;
transaction.commit()
}
/// Removes a quad from this store.
pub fn remove(&self, quad: &Quad) -> Result<()> {
let mut transaction = self.handle()?.auto_transaction();
let quad = quad.into();
transaction.remove_encoded(&quad)?;
transaction.commit()
}
fn handle<'a>(&'a self) -> Result<RocksDbStoreHandle<'a>> {
Ok(RocksDbStoreHandle {
db: &self.db,
id2str_cf: get_cf(&self.db, ID2STR_CF)?, id2str_cf: get_cf(&self.db, ID2STR_CF)?,
spog_cf: get_cf(&self.db, SPOG_CF)?, spog_cf: get_cf(&self.db, SPOG_CF)?,
posg_cf: get_cf(&self.db, POSG_CF)?, posg_cf: get_cf(&self.db, POSG_CF)?,
@ -135,93 +228,64 @@ impl<'a> Store for &'a RocksDbStore {
} }
} }
impl StrLookup for RocksDbStoreConnection<'_> { impl StrLookup for RocksDbStore {
fn get_str(&self, id: StrHash) -> Result<Option<String>> { fn get_str(&self, id: StrHash) -> Result<Option<String>> {
Ok(self Ok(self
.store
.db .db
.get_cf(self.id2str_cf, &id.to_le_bytes())? .get_cf(get_cf(&self.db, ID2STR_CF)?, &id.to_le_bytes())?
.map(String::from_utf8) .map(String::from_utf8)
.transpose()?) .transpose()?)
} }
} }
impl<'a> StoreConnection for RocksDbStoreConnection<'a> { impl ReadableEncodedStore for RocksDbStore {
type Transaction = RocksDbStoreTransaction<'a>; fn encoded_quads_for_pattern<'a>(
type AutoTransaction = RocksDbStoreAutoTransaction<'a>; &'a self,
fn transaction(&self) -> RocksDbStoreTransaction<'a> {
RocksDbStoreTransaction {
inner: RocksDbStoreInnerTransaction {
connection: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
},
}
}
fn auto_transaction(&self) -> RocksDbStoreAutoTransaction<'a> {
RocksDbStoreAutoTransaction {
inner: RocksDbStoreInnerTransaction {
connection: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
},
}
}
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
buffer.write_spog_quad(quad)?;
Ok(self
.store
.db
.get_pinned_cf(self.spog_cf, &buffer)?
.is_some())
}
fn quads_for_pattern<'b>(
&'b self,
subject: Option<EncodedTerm>, subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>, predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>, object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>, graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'b> { ) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a> {
let handle = match self.handle() {
Ok(handle) => handle,
Err(error) => return Box::new(once(Err(error))),
};
match subject { match subject {
Some(subject) => match predicate { Some(subject) => match predicate {
Some(predicate) => match object { Some(predicate) => match object {
Some(object) => match graph_name { Some(object) => match graph_name {
Some(graph_name) => { Some(graph_name) => {
let quad = EncodedQuad::new(subject, predicate, object, graph_name); let quad = EncodedQuad::new(subject, predicate, object, graph_name);
match self.contains(&quad) { match handle.contains(&quad) {
Ok(true) => Box::new(once(Ok(quad))), Ok(true) => Box::new(once(Ok(quad))),
Ok(false) => Box::new(empty()), Ok(false) => Box::new(empty()),
Err(error) => Box::new(once(Err(error))), Err(error) => Box::new(once(Err(error))),
} }
} }
None => wrap_error( None => wrap_error(
self.quads_for_subject_predicate_object(subject, predicate, object), handle.quads_for_subject_predicate_object(subject, predicate, object),
), ),
}, },
None => match graph_name { None => match graph_name {
Some(graph_name) => wrap_error( Some(graph_name) => wrap_error(
self.quads_for_subject_predicate_graph(subject, predicate, graph_name), handle
.quads_for_subject_predicate_graph(subject, predicate, graph_name),
), ),
None => wrap_error(self.quads_for_subject_predicate(subject, predicate)), None => wrap_error(handle.quads_for_subject_predicate(subject, predicate)),
}, },
}, },
None => match object { None => match object {
Some(object) => match graph_name { Some(object) => match graph_name {
Some(graph_name) => wrap_error( Some(graph_name) => wrap_error(
self.quads_for_subject_object_graph(subject, object, graph_name), handle.quads_for_subject_object_graph(subject, object, graph_name),
), ),
None => wrap_error(self.quads_for_subject_object(subject, object)), None => wrap_error(handle.quads_for_subject_object(subject, object)),
}, },
None => match graph_name { None => match graph_name {
Some(graph_name) => { Some(graph_name) => {
wrap_error(self.quads_for_subject_graph(subject, graph_name)) wrap_error(handle.quads_for_subject_graph(subject, graph_name))
} }
None => wrap_error(self.quads_for_subject(subject)), None => wrap_error(handle.quads_for_subject(subject)),
}, },
}, },
}, },
@ -229,27 +293,27 @@ impl<'a> StoreConnection for RocksDbStoreConnection<'a> {
Some(predicate) => match object { Some(predicate) => match object {
Some(object) => match graph_name { Some(object) => match graph_name {
Some(graph_name) => wrap_error( Some(graph_name) => wrap_error(
self.quads_for_predicate_object_graph(predicate, object, graph_name), handle.quads_for_predicate_object_graph(predicate, object, graph_name),
), ),
None => wrap_error(self.quads_for_predicate_object(predicate, object)), None => wrap_error(handle.quads_for_predicate_object(predicate, object)),
}, },
None => match graph_name { None => match graph_name {
Some(graph_name) => { Some(graph_name) => {
wrap_error(self.quads_for_predicate_graph(predicate, graph_name)) wrap_error(handle.quads_for_predicate_graph(predicate, graph_name))
} }
None => wrap_error(self.quads_for_predicate(predicate)), None => wrap_error(handle.quads_for_predicate(predicate)),
}, },
}, },
None => match object { None => match object {
Some(object) => match graph_name { Some(object) => match graph_name {
Some(graph_name) => { Some(graph_name) => {
wrap_error(self.quads_for_object_graph(object, graph_name)) wrap_error(handle.quads_for_object_graph(object, graph_name))
} }
None => wrap_error(self.quads_for_object(object)), None => wrap_error(handle.quads_for_object(object)),
}, },
None => match graph_name { None => match graph_name {
Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)), Some(graph_name) => wrap_error(handle.quads_for_graph(graph_name)),
None => Box::new(self.quads()), None => Box::new(handle.quads()),
}, },
}, },
}, },
@ -257,7 +321,33 @@ impl<'a> StoreConnection for RocksDbStoreConnection<'a> {
} }
} }
impl<'a> RocksDbStoreConnection<'a> { impl<'a> RocksDbStoreHandle<'a> {
fn transaction(&self) -> RocksDbTransaction<'a> {
RocksDbTransaction {
inner: RocksDbInnerTransaction {
handle: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
},
}
}
fn auto_transaction(&self) -> RocksDbAutoTransaction<'a> {
RocksDbAutoTransaction {
inner: RocksDbInnerTransaction {
handle: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
},
}
}
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
buffer.write_spog_quad(quad)?;
Ok(self.db.get_pinned_cf(self.spog_cf, &buffer)?.is_some())
}
fn quads(&self) -> impl Iterator<Item = Result<EncodedQuad>> + 'a { fn quads(&self) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
self.spog_quads(Vec::default()) self.spog_quads(Vec::default())
} }
@ -416,7 +506,7 @@ impl<'a> RocksDbStoreConnection<'a> {
prefix: Vec<u8>, prefix: Vec<u8>,
decode: impl Fn(&[u8]) -> Result<EncodedQuad> + 'a, decode: impl Fn(&[u8]) -> Result<EncodedQuad> + 'a,
) -> impl Iterator<Item = Result<EncodedQuad>> + 'a { ) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
let mut iter = self.store.db.raw_iterator_cf(cf); let mut iter = self.db.raw_iterator_cf(cf);
iter.seek(&prefix); iter.seek(&prefix);
DecodingIndexIterator { DecodingIndexIterator {
iter, iter,
@ -426,112 +516,150 @@ impl<'a> RocksDbStoreConnection<'a> {
} }
} }
pub struct RocksDbStoreTransaction<'a> { /// Allows to insert and delete quads during a transaction with the `RocksDbStore`.
inner: RocksDbStoreInnerTransaction<'a>, pub struct RocksDbTransaction<'a> {
inner: RocksDbInnerTransaction<'a>,
} }
impl StrContainer for RocksDbStoreTransaction<'_> { impl StrContainer for RocksDbTransaction<'_> {
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> { fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> {
self.inner.insert_str(key, value); self.inner.insert_str(key, value);
Ok(()) Ok(())
} }
} }
impl StoreTransaction for RocksDbStoreTransaction<'_> { impl WritableEncodedStore for RocksDbTransaction<'_> {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.inner.insert(quad) self.inner.insert(quad)
} }
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.inner.remove(quad) self.inner.remove(quad)
} }
}
impl RocksDbTransaction<'_> {
/// Loads a graph file (i.e. triples) into the store during the transaction.
///
/// Warning: Because the load happens during a transaction,
/// the full file content might be temporarily stored in main memory.
/// Do not use for big files.
///
/// See `MemoryTransaction` for a usage example.
pub fn load_graph(
&mut self,
reader: impl BufRead,
syntax: GraphSyntax,
to_graph_name: Option<&NamedOrBlankNode>,
base_iri: Option<&str>,
) -> Result<()> {
load_graph(self, reader, syntax, to_graph_name, base_iri)
}
/// Loads a dataset file (i.e. quads) into the store. into the store during the transaction.
///
/// Warning: Because the load happens during a transaction,
/// the full file content might be temporarily stored in main memory.
/// Do not use for big files.
///
/// See `MemoryTransaction` for a usage example.
pub fn load_dataset(
&mut self,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()> {
load_dataset(self, reader, syntax, base_iri)
}
/// Adds a quad to this store during the transaction.
pub fn insert(&mut self, quad: &Quad) -> Result<()> {
let quad = self.encode_quad(quad)?;
self.insert_encoded(&quad)
}
/// Removes a quad from this store during the transaction.
pub fn remove(&mut self, quad: &Quad) -> Result<()> {
let quad = quad.into();
self.remove_encoded(&quad)
}
fn commit(self) -> Result<()> { fn commit(self) -> Result<()> {
self.inner.commit() self.inner.commit()
} }
} }
pub struct RocksDbStoreAutoTransaction<'a> { pub struct RocksDbAutoTransaction<'a> {
inner: RocksDbStoreInnerTransaction<'a>, inner: RocksDbInnerTransaction<'a>,
} }
impl StrContainer for RocksDbStoreAutoTransaction<'_> { impl StrContainer for RocksDbAutoTransaction<'_> {
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> { fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> {
self.inner.insert_str(key, value); self.inner.insert_str(key, value);
Ok(()) Ok(())
} }
} }
impl StoreTransaction for RocksDbStoreAutoTransaction<'_> { impl WritableEncodedStore for RocksDbAutoTransaction<'_> {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.inner.insert(quad)?; self.inner.insert(quad)?;
self.commit_if_big() self.commit_if_big()
} }
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.inner.remove(quad)?; self.inner.remove(quad)?;
self.commit_if_big() self.commit_if_big()
} }
}
impl RocksDbAutoTransaction<'_> {
fn commit(self) -> Result<()> { fn commit(self) -> Result<()> {
self.inner.commit() self.inner.commit()
} }
}
impl RocksDbStoreAutoTransaction<'_> {
fn commit_if_big(&mut self) -> Result<()> { fn commit_if_big(&mut self) -> Result<()> {
if self.inner.batch.len() > MAX_TRANSACTION_SIZE { if self.inner.batch.len() > MAX_TRANSACTION_SIZE {
self.inner self.inner.handle.db.write(take(&mut self.inner.batch))?;
.connection
.store
.db
.write(take(&mut self.inner.batch))?;
} }
Ok(()) Ok(())
} }
} }
struct RocksDbStoreInnerTransaction<'a> { struct RocksDbInnerTransaction<'a> {
connection: RocksDbStoreConnection<'a>, handle: RocksDbStoreHandle<'a>,
batch: WriteBatch, batch: WriteBatch,
buffer: Vec<u8>, buffer: Vec<u8>,
} }
impl RocksDbStoreInnerTransaction<'_> { impl RocksDbInnerTransaction<'_> {
fn insert_str(&mut self, key: StrHash, value: &str) { fn insert_str(&mut self, key: StrHash, value: &str) {
self.batch self.batch
.put_cf(self.connection.id2str_cf, &key.to_le_bytes(), value) .put_cf(self.handle.id2str_cf, &key.to_le_bytes(), value)
} }
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
self.buffer.write_spog_quad(quad)?; self.buffer.write_spog_quad(quad)?;
self.batch self.batch.put_cf(self.handle.spog_cf, &self.buffer, &[]);
.put_cf(self.connection.spog_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_posg_quad(quad)?; self.buffer.write_posg_quad(quad)?;
self.batch self.batch.put_cf(self.handle.posg_cf, &self.buffer, &[]);
.put_cf(self.connection.posg_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_ospg_quad(quad)?; self.buffer.write_ospg_quad(quad)?;
self.batch self.batch.put_cf(self.handle.ospg_cf, &self.buffer, &[]);
.put_cf(self.connection.ospg_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gspo_quad(quad)?; self.buffer.write_gspo_quad(quad)?;
self.batch self.batch.put_cf(self.handle.gspo_cf, &self.buffer, &[]);
.put_cf(self.connection.gspo_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gpos_quad(quad)?; self.buffer.write_gpos_quad(quad)?;
self.batch self.batch.put_cf(self.handle.gpos_cf, &self.buffer, &[]);
.put_cf(self.connection.gpos_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gosp_quad(quad)?; self.buffer.write_gosp_quad(quad)?;
self.batch self.batch.put_cf(self.handle.gosp_cf, &self.buffer, &[]);
.put_cf(self.connection.gosp_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
Ok(()) Ok(())
@ -539,34 +667,34 @@ impl RocksDbStoreInnerTransaction<'_> {
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { fn remove(&mut self, quad: &EncodedQuad) -> Result<()> {
self.buffer.write_spog_quad(quad)?; self.buffer.write_spog_quad(quad)?;
self.batch.delete_cf(self.connection.spog_cf, &self.buffer); self.batch.delete_cf(self.handle.spog_cf, &self.buffer);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_posg_quad(quad)?; self.buffer.write_posg_quad(quad)?;
self.batch.delete_cf(self.connection.posg_cf, &self.buffer); self.batch.delete_cf(self.handle.posg_cf, &self.buffer);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_ospg_quad(quad)?; self.buffer.write_ospg_quad(quad)?;
self.batch.delete_cf(self.connection.ospg_cf, &self.buffer); self.batch.delete_cf(self.handle.ospg_cf, &self.buffer);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gspo_quad(quad)?; self.buffer.write_gspo_quad(quad)?;
self.batch.delete_cf(self.connection.gspo_cf, &self.buffer); self.batch.delete_cf(self.handle.gspo_cf, &self.buffer);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gpos_quad(quad)?; self.buffer.write_gpos_quad(quad)?;
self.batch.delete_cf(self.connection.gpos_cf, &self.buffer); self.batch.delete_cf(self.handle.gpos_cf, &self.buffer);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gosp_quad(quad)?; self.buffer.write_gosp_quad(quad)?;
self.batch.delete_cf(self.connection.gosp_cf, &self.buffer); self.batch.delete_cf(self.handle.gosp_cf, &self.buffer);
self.buffer.clear(); self.buffer.clear();
Ok(()) Ok(())
} }
fn commit(self) -> Result<()> { fn commit(self) -> Result<()> {
self.connection.store.db.write(self.batch)?; self.handle.db.write(self.batch)?;
Ok(()) Ok(())
} }
} }
@ -631,7 +759,7 @@ impl<'a, F: Fn(&[u8]) -> Result<EncodedQuad>> Iterator for DecodingIndexIterator
} }
#[test] #[test]
fn repository() -> Result<()> { fn store() -> Result<()> {
use crate::model::*; use crate::model::*;
use crate::*; use crate::*;
use rand::random; use rand::random;
@ -653,94 +781,93 @@ fn repository() -> Result<()> {
repo_path.push(random::<u128>().to_string()); repo_path.push(random::<u128>().to_string());
{ {
let repository = RocksDbRepository::open(&repo_path)?; let store = RocksDbStore::open(&repo_path)?;
let mut connection = repository.connection()?; store.insert(&main_quad)?;
connection.insert(&main_quad)?;
for t in &all_o { for t in &all_o {
connection.insert(t)?; store.insert(t)?;
} }
let target = vec![main_quad]; let target = vec![main_quad];
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(None, None, None, None) .quads_for_pattern(None, None, None, None)
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
all_o all_o
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(Some(&main_s), None, None, None) .quads_for_pattern(Some(&main_s), None, None, None)
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
all_o all_o
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(Some(&main_s), Some(&main_p), None, None) .quads_for_pattern(Some(&main_s), Some(&main_p), None, None)
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
all_o all_o
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(Some(&main_s), Some(&main_p), Some(&main_o), None) .quads_for_pattern(Some(&main_s), Some(&main_p), Some(&main_o), None)
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
target target
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(Some(&main_s), Some(&main_p), Some(&main_o), Some(None)) .quads_for_pattern(Some(&main_s), Some(&main_p), Some(&main_o), Some(None))
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
target target
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(Some(&main_s), Some(&main_p), None, Some(None)) .quads_for_pattern(Some(&main_s), Some(&main_p), None, Some(None))
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
all_o all_o
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(Some(&main_s), None, Some(&main_o), None) .quads_for_pattern(Some(&main_s), None, Some(&main_o), None)
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
target target
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(Some(&main_s), None, Some(&main_o), Some(None)) .quads_for_pattern(Some(&main_s), None, Some(&main_o), Some(None))
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
target target
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(Some(&main_s), None, None, Some(None)) .quads_for_pattern(Some(&main_s), None, None, Some(None))
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
all_o all_o
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(None, Some(&main_p), None, None) .quads_for_pattern(None, Some(&main_p), None, None)
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
all_o all_o
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(None, Some(&main_p), Some(&main_o), None) .quads_for_pattern(None, Some(&main_p), Some(&main_o), None)
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
target target
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(None, None, Some(&main_o), None) .quads_for_pattern(None, None, Some(&main_o), None)
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
target target
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(None, None, None, Some(None)) .quads_for_pattern(None, None, None, Some(None))
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
all_o all_o
); );
assert_eq!( assert_eq!(
connection store
.quads_for_pattern(None, Some(&main_p), Some(&main_o), Some(None)) .quads_for_pattern(None, Some(&main_p), Some(&main_o), Some(None))
.collect::<Result<Vec<_>>>()?, .collect::<Result<Vec<_>>>()?,
target target

@ -202,25 +202,20 @@ fn literal(str: String) -> Term {
Term::Literal(Literal::new_simple_literal(str)) Term::Literal(Literal::new_simple_literal(str))
} }
fn make_repository(reader: impl BufRead) -> Result<MemoryRepository> { fn make_store(reader: impl BufRead) -> Result<MemoryStore> {
let repository = MemoryRepository::default(); let store = MemoryStore::default();
let mut connection = repository.connection()?; store
connection
.load_graph(reader, GraphSyntax::NTriples, None, None) .load_graph(reader, GraphSyntax::NTriples, None, None)
.unwrap(); .unwrap();
Ok(repository) Ok(store)
} }
fn query_repository<'a>( fn query_store<'a>(
repository: MemoryRepository, store: MemoryStore,
query: String, query: String,
options: QueryOptions<'a>, options: QueryOptions<'a>,
) -> Result<BindingsIterator<'a>> { ) -> Result<BindingsIterator<'a>> {
match repository match store.prepare_query(&query, options)?.exec()? {
.connection()?
.prepare_query(&query, options)?
.exec()?
{
QueryResult::Bindings(iterator) => { QueryResult::Bindings(iterator) => {
let (varaibles, iter) = iterator.destruct(); let (varaibles, iter) = iterator.destruct();
let collected = iter.collect::<Vec<_>>(); let collected = iter.collect::<Vec<_>>();
@ -233,13 +228,12 @@ fn query_repository<'a>(
} }
} }
fn pattern_repository<'a>( fn pattern_store<'a>(
repository: MemoryRepository, store: MemoryStore,
pattern: &'a GraphPattern, pattern: &'a GraphPattern,
options: QueryOptions<'a>, options: QueryOptions<'a>,
) -> Result<BindingsIterator<'a>> { ) -> Result<BindingsIterator<'a>> {
match repository match store
.connection()?
.prepare_query_from_pattern(&pattern, options)? .prepare_query_from_pattern(&pattern, options)?
.exec()? .exec()?
{ {
@ -260,8 +254,8 @@ fn do_query<'a>(
query: String, query: String,
options: QueryOptions<'a>, options: QueryOptions<'a>,
) -> Result<BindingsIterator<'a>> { ) -> Result<BindingsIterator<'a>> {
let repository = make_repository(reader)?; let store = make_store(reader)?;
query_repository(repository, query, options) query_store(store, query, options)
} }
fn do_pattern<'a>( fn do_pattern<'a>(
@ -269,6 +263,6 @@ fn do_pattern<'a>(
pattern: &'a GraphPattern, pattern: &'a GraphPattern,
options: QueryOptions<'a>, options: QueryOptions<'a>,
) -> Result<BindingsIterator<'a>> { ) -> Result<BindingsIterator<'a>> {
let repository = make_repository(reader)?; let store = make_store(reader)?;
pattern_repository(repository, pattern, options) pattern_store(store, pattern, options)
} }

@ -148,20 +148,18 @@ fn sparql_w3c_query_evaluation_testsuite() -> Result<()> {
if test_blacklist.contains(&test.id) { if test_blacklist.contains(&test.id) {
Ok(()) Ok(())
} else if test.kind == "QueryEvaluationTest" { } else if test.kind == "QueryEvaluationTest" {
let repository = MemoryRepository::default(); let store = MemoryStore::default();
if let Some(data) = &test.data { if let Some(data) = &test.data {
load_graph_to_repository(&data, &mut repository.connection()?, None)?; load_graph_to_store(&data, &store, None)?;
} }
for graph_data in &test.graph_data { for graph_data in &test.graph_data {
load_graph_to_repository( load_graph_to_store(
&graph_data, &graph_data,
&mut repository.connection()?, &store,
Some(&NamedNode::parse(graph_data)?.into()), Some(&NamedNode::parse(graph_data)?.into()),
)?; )?;
} }
match repository match store.prepare_query(&read_file_to_string(&test.query)?, QueryOptions::default().with_base_iri(&test.query).with_service_handler(StaticServiceHandler::new(&test.service_data)?))
.connection()?
.prepare_query(&read_file_to_string(&test.query)?, QueryOptions::default().with_base_iri(&test.query).with_service_handler(StaticServiceHandler::new(&test.service_data)?))
{ {
Err(error) => Err(Error::msg(format!( Err(error) => Err(Error::msg(format!(
"Failure to parse query of {} with error: {}", "Failure to parse query of {} with error: {}",
@ -188,7 +186,7 @@ fn sparql_w3c_query_evaluation_testsuite() -> Result<()> {
expected_graph, expected_graph,
actual_graph, actual_graph,
Query::parse(&read_file_to_string(&test.query)?, Some(&test.query)).unwrap(), Query::parse(&read_file_to_string(&test.query)?, Some(&test.query)).unwrap(),
repository_to_string(&repository) store_to_string(&store)
))) )))
} }
} }
@ -209,29 +207,25 @@ fn sparql_w3c_query_evaluation_testsuite() -> Result<()> {
Ok(()) Ok(())
} }
fn repository_to_string(repository: impl Repository) -> String { fn store_to_string(store: &MemoryStore) -> String {
repository store
.connection()
.unwrap()
.quads_for_pattern(None, None, None, None) .quads_for_pattern(None, None, None, None)
.map(|q| q.unwrap().to_string() + "\n") .map(|q| q.unwrap().to_string() + "\n")
.collect() .collect()
} }
fn load_graph(url: &str) -> Result<SimpleGraph> { fn load_graph(url: &str) -> Result<SimpleGraph> {
let repository = MemoryRepository::default(); let store = MemoryStore::default();
load_graph_to_repository(url, &mut repository.connection().unwrap(), None)?; load_graph_to_store(url, &store, None)?;
Ok(repository Ok(store
.connection()
.unwrap()
.quads_for_pattern(None, None, None, Some(None)) .quads_for_pattern(None, None, None, Some(None))
.map(|q| q.unwrap().into_triple()) .map(|q| q.unwrap().into_triple())
.collect()) .collect())
} }
fn load_graph_to_repository( fn load_graph_to_store(
url: &str, url: &str,
connection: &mut <&MemoryRepository as Repository>::Connection, store: &MemoryStore,
to_graph_name: Option<&NamedOrBlankNode>, to_graph_name: Option<&NamedOrBlankNode>,
) -> Result<()> { ) -> Result<()> {
let syntax = if url.ends_with(".nt") { let syntax = if url.ends_with(".nt") {
@ -246,23 +240,22 @@ fn load_graph_to_repository(
url url
))); )));
}; };
connection.load_graph(read_file(url)?, syntax, to_graph_name, Some(url)) store.load_graph(read_file(url)?, syntax, to_graph_name, Some(url))
} }
fn load_sparql_query_result_graph(url: &str) -> Result<SimpleGraph> { fn load_sparql_query_result_graph(url: &str) -> Result<SimpleGraph> {
let repository = MemoryRepository::default(); let store = MemoryStore::default();
let mut connection = repository.connection()?;
if url.ends_with(".srx") { if url.ends_with(".srx") {
for t in to_graph( for t in to_graph(
QueryResult::read(read_file(url)?, QueryResultSyntax::Xml)?, QueryResult::read(read_file(url)?, QueryResultSyntax::Xml)?,
false, false,
)? { )? {
connection.insert(&t.in_graph(None))?; store.insert(&t.in_graph(None))?;
} }
} else { } else {
load_graph_to_repository(url, &mut connection, None)?; load_graph_to_store(url, &store, None)?;
} }
Ok(connection Ok(store
.quads_for_pattern(None, None, None, Some(None)) .quads_for_pattern(None, None, None, Some(None))
.map(|q| q.unwrap().into_triple()) .map(|q| q.unwrap().into_triple())
.collect()) .collect())
@ -701,7 +694,7 @@ impl<'a> Iterator for RdfListIterator<'a> {
#[derive(Clone)] #[derive(Clone)]
struct StaticServiceHandler { struct StaticServiceHandler {
services: Arc<HashMap<NamedNode, MemoryRepository>>, services: Arc<HashMap<NamedNode, MemoryStore>>,
} }
impl StaticServiceHandler { impl StaticServiceHandler {
@ -712,9 +705,9 @@ impl StaticServiceHandler {
.iter() .iter()
.map(|(name, data)| { .map(|(name, data)| {
let name = NamedNode::parse(name)?; let name = NamedNode::parse(name)?;
let repository = MemoryRepository::default(); let store = MemoryStore::default();
load_graph_to_repository(&data, &mut repository.connection()?, None)?; load_graph_to_store(&data, &store, None)?;
Ok((name, repository)) Ok((name, store))
}) })
.collect::<Result<_>>()?, .collect::<Result<_>>()?,
), ),
@ -732,7 +725,6 @@ impl ServiceHandler for StaticServiceHandler {
.services .services
.get(service_name) .get(service_name)
.ok_or_else(|| Error::msg(format!("Service {} not found", service_name)))? .ok_or_else(|| Error::msg(format!("Service {} not found", service_name)))?
.connection()?
.prepare_query_from_pattern( .prepare_query_from_pattern(
&graph_pattern, &graph_pattern,
QueryOptions::default().with_service_handler(self.clone()), QueryOptions::default().with_service_handler(self.clone()),

@ -2,27 +2,24 @@
mod test { mod test {
use oxigraph::model::*; use oxigraph::model::*;
use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult}; use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult};
use oxigraph::{MemoryRepository, Repository, RepositoryConnection, Result}; use oxigraph::{MemoryStore, Result};
use std::str::FromStr; use std::str::FromStr;
use wasm_bindgen_test::*; use wasm_bindgen_test::*;
#[wasm_bindgen_test] #[wasm_bindgen_test]
fn simple() { fn simple() {
let repository = MemoryRepository::default(); let store = MemoryStore::default();
let mut connection = repository.connection().unwrap();
// insertion // insertion
let ex = NamedNode::parse("http://example.com").unwrap(); let ex = NamedNode::parse("http://example.com").unwrap();
let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
connection.insert(&quad).unwrap(); store.insert(&quad).unwrap();
// quad filter // quad filter
let results: Result<Vec<Quad>> = connection let results: Result<Vec<Quad>> = store.quads_for_pattern(None, None, None, None).collect();
.quads_for_pattern(None, None, None, None)
.collect();
assert_eq!(vec![quad], results.unwrap()); assert_eq!(vec![quad], results.unwrap());
// SPARQL query // SPARQL query
let prepared_query = connection let prepared_query = store
.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default()) .prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())
.unwrap(); .unwrap();
let results = prepared_query.exec().unwrap(); let results = prepared_query.exec().unwrap();
@ -36,9 +33,7 @@ mod test {
#[wasm_bindgen_test] #[wasm_bindgen_test]
fn now() { fn now() {
if let QueryResult::Bindings(results) = MemoryRepository::default() if let QueryResult::Bindings(results) = MemoryStore::default()
.connection()
.unwrap()
.prepare_query( .prepare_query(
"SELECT (YEAR(NOW()) AS ?y) WHERE {}", "SELECT (YEAR(NOW()) AS ?y) WHERE {}",
QueryOptions::default(), QueryOptions::default(),

@ -14,15 +14,11 @@ use async_std::future::Future;
use async_std::io::{BufRead, Read}; use async_std::io::{BufRead, Read};
use async_std::net::{TcpListener, TcpStream}; use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*; use async_std::prelude::*;
use async_std::sync::Arc;
use async_std::task::{block_on, spawn, spawn_blocking}; use async_std::task::{block_on, spawn, spawn_blocking};
use http_types::headers::HeaderName; use http_types::headers::HeaderName;
use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode};
use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax}; use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax};
use oxigraph::{ use oxigraph::{DatasetSyntax, FileSyntax, GraphSyntax, RocksDbStore};
DatasetSyntax, FileSyntax, GraphSyntax, MemoryRepository, Repository, RepositoryConnection,
RocksDbRepository,
};
use std::str::FromStr; use std::str::FromStr;
use url::form_urlencoded; use url::form_urlencoded;
@ -37,41 +33,24 @@ struct Args {
#[argh(option, short = 'b', default = "\"localhost:7878\".to_string()")] #[argh(option, short = 'b', default = "\"localhost:7878\".to_string()")]
bind: String, bind: String,
/// directory in which persist the data. By default data are kept in memory /// directory in which persist the data
#[argh(option, short = 'f')] #[argh(option, short = 'f')]
file: Option<String>, file: String,
} }
#[async_std::main] #[async_std::main]
pub async fn main() -> Result<()> { pub async fn main() -> Result<()> {
let args: Args = argh::from_env(); let args: Args = argh::from_env();
let store = RocksDbStore::open(args.file)?;
if let Some(file) = args.file { println!("Listening for requests at http://{}", &args.bind);
main_with_dataset(Arc::new(RocksDbRepository::open(file)?), args.bind).await http_server(args.bind, move |request| {
} else { handle_request(request, store.clone())
main_with_dataset(Arc::new(MemoryRepository::default()), args.bind).await
}
}
async fn main_with_dataset<R: Send + Sync + 'static>(repository: Arc<R>, host: String) -> Result<()>
where
for<'a> &'a R: Repository,
{
println!("Listening for requests at http://{}", &host);
http_server(host, move |request| {
handle_request(request, Arc::clone(&repository))
}) })
.await .await
} }
async fn handle_request<R: Send + Sync + 'static>( async fn handle_request(request: Request, store: RocksDbStore) -> Result<Response> {
request: Request,
repository: Arc<R>,
) -> Result<Response>
where
for<'a> &'a R: Repository,
{
let mut response = match (request.url().path(), request.method()) { let mut response = match (request.url().path(), request.method()) {
("/", Method::Get) => { ("/", Method::Get) => {
let mut response = Response::new(StatusCode::Ok); let mut response = Response::new(StatusCode::Ok);
@ -83,20 +62,11 @@ where
if let Some(content_type) = request.content_type() { if let Some(content_type) = request.content_type() {
match if let Some(format) = GraphSyntax::from_mime_type(essence(&content_type)) { match if let Some(format) = GraphSyntax::from_mime_type(essence(&content_type)) {
spawn_blocking(move || { spawn_blocking(move || {
repository.connection()?.load_graph( store.load_graph(SyncAsyncBufReader::from(request), format, None, None)
SyncAsyncBufReader::from(request),
format,
None,
None,
)
}) })
} else if let Some(format) = DatasetSyntax::from_mime_type(essence(&content_type)) { } else if let Some(format) = DatasetSyntax::from_mime_type(essence(&content_type)) {
spawn_blocking(move || { spawn_blocking(move || {
repository.connection()?.load_dataset( store.load_dataset(SyncAsyncBufReader::from(request), format, None)
SyncAsyncBufReader::from(request),
format,
None,
)
}) })
} else { } else {
return Ok(simple_response( return Ok(simple_response(
@ -119,7 +89,7 @@ where
} }
("/query", Method::Get) => { ("/query", Method::Get) => {
evaluate_urlencoded_sparql_query( evaluate_urlencoded_sparql_query(
repository, store,
request.url().query().unwrap_or("").as_bytes().to_vec(), request.url().query().unwrap_or("").as_bytes().to_vec(),
request, request,
) )
@ -135,7 +105,7 @@ where
.take(MAX_SPARQL_BODY_SIZE) .take(MAX_SPARQL_BODY_SIZE)
.read_to_string(&mut buffer) .read_to_string(&mut buffer)
.await?; .await?;
evaluate_sparql_query(repository, buffer, request).await? evaluate_sparql_query(store, buffer, request).await?
} else if essence(&content_type) == "application/x-www-form-urlencoded" { } else if essence(&content_type) == "application/x-www-form-urlencoded" {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let mut request = request; let mut request = request;
@ -144,7 +114,7 @@ where
.take(MAX_SPARQL_BODY_SIZE) .take(MAX_SPARQL_BODY_SIZE)
.read_to_end(&mut buffer) .read_to_end(&mut buffer)
.await?; .await?;
evaluate_urlencoded_sparql_query(repository, buffer, request).await? evaluate_urlencoded_sparql_query(store, buffer, request).await?
} else { } else {
simple_response( simple_response(
StatusCode::UnsupportedMediaType, StatusCode::UnsupportedMediaType,
@ -172,16 +142,13 @@ fn simple_response(status: StatusCode, body: impl Into<Body>) -> Response {
response response
} }
async fn evaluate_urlencoded_sparql_query<R: Send + Sync + 'static>( async fn evaluate_urlencoded_sparql_query(
repository: Arc<R>, store: RocksDbStore,
encoded: Vec<u8>, encoded: Vec<u8>,
request: Request, request: Request,
) -> Result<Response> ) -> Result<Response> {
where
for<'a> &'a R: Repository,
{
if let Some((_, query)) = form_urlencoded::parse(&encoded).find(|(k, _)| k == "query") { if let Some((_, query)) = form_urlencoded::parse(&encoded).find(|(k, _)| k == "query") {
evaluate_sparql_query(repository, query.to_string(), request).await evaluate_sparql_query(store, query.to_string(), request).await
} else { } else {
Ok(simple_response( Ok(simple_response(
StatusCode::BadRequest, StatusCode::BadRequest,
@ -190,18 +157,14 @@ where
} }
} }
async fn evaluate_sparql_query<R: Send + Sync + 'static>( async fn evaluate_sparql_query(
repository: Arc<R>, store: RocksDbStore,
query: String, query: String,
request: Request, request: Request,
) -> Result<Response> ) -> Result<Response> {
where
for<'a> &'a R: Repository,
{
spawn_blocking(move || { spawn_blocking(move || {
//TODO: stream //TODO: stream
let query = repository let query = store
.connection()?
.prepare_query(&query, QueryOptions::default()) .prepare_query(&query, QueryOptions::default())
.map_err(|e| { .map_err(|e| {
let mut e = Error::from(e); let mut e = Error::from(e);
@ -354,10 +317,13 @@ impl<R: BufRead + Unpin> std::io::BufRead for SyncAsyncBufReader<R> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use crate::handle_request; use crate::handle_request;
use async_std::sync::Arc;
use async_std::task::block_on; use async_std::task::block_on;
use http_types::{Method, Request, StatusCode, Url}; use http_types::{Method, Request, StatusCode, Url};
use oxigraph::MemoryRepository; use oxigraph::RocksDbStore;
use std::collections::hash_map::DefaultHasher;
use std::env::temp_dir;
use std::fs::remove_dir_all;
use std::hash::{Hash, Hasher};
#[test] #[test]
fn get_ui() { fn get_ui() {
@ -458,13 +424,20 @@ mod tests {
} }
fn exec(request: Request, expected_status: StatusCode) { fn exec(request: Request, expected_status: StatusCode) {
let repository = Arc::new(MemoryRepository::default()); let mut path = temp_dir();
path.push("temp-oxigraph-server-test");
let mut s = DefaultHasher::new();
format!("{:?}", request).hash(&mut s);
path.push(&s.finish().to_string());
let store = RocksDbStore::open(&path).unwrap();
assert_eq!( assert_eq!(
match block_on(handle_request(request, Arc::clone(&repository))) { match block_on(handle_request(request, store)) {
Ok(r) => r.status(), Ok(r) => r.status(),
Err(e) => e.status(), Err(e) => e.status(),
}, },
expected_status expected_status
); );
remove_dir_all(&path).unwrap()
} }
} }

@ -6,7 +6,7 @@ use http_client::h1::H1Client;
use http_client::HttpClient; use http_client::HttpClient;
use http_types::{Method, Request, Result}; use http_types::{Method, Request, Result};
use oxigraph::model::NamedNode; use oxigraph::model::NamedNode;
use oxigraph::{GraphSyntax, Repository, RepositoryConnection, RepositoryTransaction}; use oxigraph::{GraphSyntax, RocksDbStore};
use serde_json::Value; use serde_json::Value;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::io::{BufReader, Cursor, Read}; use std::io::{BufReader, Cursor, Read};
@ -14,8 +14,8 @@ use std::thread::sleep;
use std::time::Duration; use std::time::Duration;
use url::{form_urlencoded, Url}; use url::{form_urlencoded, Url};
pub struct WikibaseLoader<R: Repository + Copy> { pub struct WikibaseLoader {
repository: R, store: RocksDbStore,
api_url: Url, api_url: Url,
entity_data_url: Url, entity_data_url: Url,
client: H1Client, client: H1Client,
@ -25,9 +25,9 @@ pub struct WikibaseLoader<R: Repository + Copy> {
start: DateTime<Utc>, start: DateTime<Utc>,
} }
impl<R: Repository + Copy> WikibaseLoader<R> { impl WikibaseLoader {
pub fn new( pub fn new(
repository: R, store: RocksDbStore,
api_url: &str, api_url: &str,
pages_base_url: &str, pages_base_url: &str,
namespaces: &[u32], namespaces: &[u32],
@ -35,7 +35,7 @@ impl<R: Repository + Copy> WikibaseLoader<R> {
frequency: Duration, frequency: Duration,
) -> Result<Self> { ) -> Result<Self> {
Ok(Self { Ok(Self {
repository, store,
api_url: Url::parse(api_url)?, api_url: Url::parse(api_url)?,
entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData"))?, entity_data_url: Url::parse(&(pages_base_url.to_owned() + "Special:EntityData"))?,
client: H1Client::new(), client: H1Client::new(),
@ -229,10 +229,10 @@ impl<R: Repository + Copy> WikibaseLoader<R> {
} }
fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> { fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> {
let connection = self.repository.connection()?;
let graph_name = NamedNode::parse(uri)?.into(); let graph_name = NamedNode::parse(uri)?.into();
connection.transaction(|transaction| { self.store.transaction(|transaction| {
let to_remove = connection let to_remove = self
.store
.quads_for_pattern(None, None, None, Some(Some(&graph_name))) .quads_for_pattern(None, None, None, Some(Some(&graph_name)))
.collect::<oxigraph::Result<Vec<_>>>()?; .collect::<oxigraph::Result<Vec<_>>>()?;
for q in to_remove { for q in to_remove {

@ -14,14 +14,11 @@ use argh::FromArgs;
use async_std::future::Future; use async_std::future::Future;
use async_std::net::{TcpListener, TcpStream}; use async_std::net::{TcpListener, TcpStream};
use async_std::prelude::*; use async_std::prelude::*;
use async_std::sync::Arc;
use async_std::task::{spawn, spawn_blocking}; use async_std::task::{spawn, spawn_blocking};
use http_types::headers::HeaderName; use http_types::headers::HeaderName;
use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode}; use http_types::{headers, Body, Error, Method, Mime, Request, Response, Result, StatusCode};
use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax}; use oxigraph::sparql::{PreparedQuery, QueryOptions, QueryResult, QueryResultSyntax};
use oxigraph::{ use oxigraph::{FileSyntax, GraphSyntax, RocksDbStore};
FileSyntax, GraphSyntax, MemoryRepository, Repository, RepositoryConnection, RocksDbRepository,
};
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use url::form_urlencoded; use url::form_urlencoded;
@ -38,9 +35,9 @@ struct Args {
#[argh(option, short = 'b', default = "\"localhost:7878\".to_string()")] #[argh(option, short = 'b', default = "\"localhost:7878\".to_string()")]
bind: String, bind: String,
/// directory in which persist the data. By default data are kept in memory /// directory in which persist the data
#[argh(option, short = 'f')] #[argh(option, short = 'f')]
file: Option<String>, file: String,
#[argh(option)] #[argh(option)]
/// base URL of the MediaWiki API like https://www.wikidata.org/w/api.php /// base URL of the MediaWiki API like https://www.wikidata.org/w/api.php
@ -63,19 +60,7 @@ struct Args {
pub async fn main() -> Result<()> { pub async fn main() -> Result<()> {
let args: Args = argh::from_env(); let args: Args = argh::from_env();
let file = args.file.clone(); let store = RocksDbStore::open(args.file)?;
if let Some(file) = file {
main_with_dataset(Arc::new(RocksDbRepository::open(file)?), args).await
} else {
main_with_dataset(Arc::new(MemoryRepository::default()), args).await
}
}
async fn main_with_dataset<R: Send + Sync + 'static>(repository: Arc<R>, args: Args) -> Result<()>
where
for<'a> &'a R: Repository,
{
let repo = repository.clone();
let mediawiki_api = args.mediawiki_api.clone(); let mediawiki_api = args.mediawiki_api.clone();
let mediawiki_base_url = args.mediawiki_base_url.clone(); let mediawiki_base_url = args.mediawiki_base_url.clone();
let namespaces = args let namespaces = args
@ -93,9 +78,10 @@ where
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let slot = args.slot.clone(); let slot = args.slot.clone();
let repo = store.clone();
spawn_blocking(move || { spawn_blocking(move || {
let mut loader = WikibaseLoader::new( let mut loader = WikibaseLoader::new(
repo.as_ref(), repo,
&mediawiki_api, &mediawiki_api,
&mediawiki_base_url, &mediawiki_base_url,
&namespaces, &namespaces,
@ -110,22 +96,16 @@ where
println!("Listening for requests at http://{}", &args.bind); println!("Listening for requests at http://{}", &args.bind);
http_server(args.bind, move |request| { http_server(args.bind, move |request| {
handle_request(request, Arc::clone(&repository)) handle_request(request, store.clone())
}) })
.await .await
} }
async fn handle_request<R: Send + Sync + 'static>( async fn handle_request(request: Request, store: RocksDbStore) -> Result<Response> {
request: Request,
repository: Arc<R>,
) -> Result<Response>
where
for<'a> &'a R: Repository,
{
let mut response = match (request.url().path(), request.method()) { let mut response = match (request.url().path(), request.method()) {
("/query", Method::Get) => { ("/query", Method::Get) => {
evaluate_urlencoded_sparql_query( evaluate_urlencoded_sparql_query(
repository, store,
request.url().query().unwrap_or("").as_bytes().to_vec(), request.url().query().unwrap_or("").as_bytes().to_vec(),
request, request,
) )
@ -141,7 +121,7 @@ where
.take(MAX_SPARQL_BODY_SIZE) .take(MAX_SPARQL_BODY_SIZE)
.read_to_string(&mut buffer) .read_to_string(&mut buffer)
.await?; .await?;
evaluate_sparql_query(repository, buffer, request).await? evaluate_sparql_query(store, buffer, request).await?
} else if essence(&content_type) == "application/x-www-form-urlencoded" { } else if essence(&content_type) == "application/x-www-form-urlencoded" {
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let mut request = request; let mut request = request;
@ -150,7 +130,7 @@ where
.take(MAX_SPARQL_BODY_SIZE) .take(MAX_SPARQL_BODY_SIZE)
.read_to_end(&mut buffer) .read_to_end(&mut buffer)
.await?; .await?;
evaluate_urlencoded_sparql_query(repository, buffer, request).await? evaluate_urlencoded_sparql_query(store, buffer, request).await?
} else { } else {
simple_response( simple_response(
StatusCode::UnsupportedMediaType, StatusCode::UnsupportedMediaType,
@ -178,16 +158,13 @@ fn simple_response(status: StatusCode, body: impl Into<Body>) -> Response {
response response
} }
async fn evaluate_urlencoded_sparql_query<R: Send + Sync + 'static>( async fn evaluate_urlencoded_sparql_query(
repository: Arc<R>, store: RocksDbStore,
encoded: Vec<u8>, encoded: Vec<u8>,
request: Request, request: Request,
) -> Result<Response> ) -> Result<Response> {
where
for<'a> &'a R: Repository,
{
if let Some((_, query)) = form_urlencoded::parse(&encoded).find(|(k, _)| k == "query") { if let Some((_, query)) = form_urlencoded::parse(&encoded).find(|(k, _)| k == "query") {
evaluate_sparql_query(repository, query.to_string(), request).await evaluate_sparql_query(store, query.to_string(), request).await
} else { } else {
Ok(simple_response( Ok(simple_response(
StatusCode::BadRequest, StatusCode::BadRequest,
@ -196,18 +173,14 @@ where
} }
} }
async fn evaluate_sparql_query<R: Send + Sync + 'static>( async fn evaluate_sparql_query(
repository: Arc<R>, store: RocksDbStore,
query: String, query: String,
request: Request, request: Request,
) -> Result<Response> ) -> Result<Response> {
where
for<'a> &'a R: Repository,
{
spawn_blocking(move || { spawn_blocking(move || {
//TODO: stream //TODO: stream
let query = repository let query = store
.connection()?
.prepare_query(&query, QueryOptions::default()) .prepare_query(&query, QueryOptions::default())
.map_err(|e| { .map_err(|e| {
let mut e = Error::from(e); let mut e = Error::from(e);

Loading…
Cancel
Save