Adds proper write transaction support

pull/22/head
Tpt 5 years ago committed by Tpt
parent 5f0c5b150a
commit 62e3d14984
  1. 18
      lib/src/lib.rs
  2. 179
      lib/src/repository.rs
  3. 279
      lib/src/store/memory.rs
  4. 127
      lib/src/store/mod.rs
  5. 136
      lib/src/store/rocksdb.rs
  6. 31
      wikibase/src/loader.rs

@ -10,28 +10,29 @@
//!
//! ```
//! use oxigraph::model::*;
//! use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result};
//! use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result};
//! use crate::oxigraph::sparql::{PreparedQuery, QueryOptions};
//! use oxigraph::sparql::QueryResult;
//!
//! let repository = MemoryRepository::default();
//! let mut connection = repository.connection().unwrap();
//! let mut connection = repository.connection()?;
//!
//! // insertion
//! let ex = NamedNode::parse("http://example.com").unwrap();
//! let ex = NamedNode::parse("http://example.com")?;
//! let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
//! connection.insert(&quad);
//! connection.insert(&quad)?;
//!
//! // quad filter
//! let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
//! assert_eq!(vec![quad], results.unwrap());
//! assert_eq!(vec![quad], results?);
//!
//! // SPARQL query
//! let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default()).unwrap();
//! let results = prepared_query.exec().unwrap();
//! let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?;
//! let results = prepared_query.exec()?;
//! if let QueryResult::Bindings(results) = results {
//! assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into()));
//! assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into()));
//! }
//! # Result::Ok(())
//! ```
pub mod model;
@ -44,6 +45,7 @@ pub use failure::Error;
pub type Result<T> = ::std::result::Result<T, failure::Error>;
pub use crate::repository::Repository;
pub use crate::repository::RepositoryConnection;
pub use crate::repository::RepositoryTransaction;
pub use crate::store::MemoryRepository;
#[cfg(feature = "rocksdb")]
pub use crate::store::RocksDbRepository;

@ -18,34 +18,35 @@ use std::io::BufRead;
/// use oxigraph::sparql::QueryResult;
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection().unwrap();
/// let mut connection = repository.connection()?;
///
/// // insertion
/// let ex = NamedNode::parse("http://example.com").unwrap();
/// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// connection.insert(&quad);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results.unwrap());
/// assert_eq!(vec![quad], results?);
///
/// // SPARQL query
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default()).unwrap();
/// let results = prepared_query.exec().unwrap();
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?;
/// let results = prepared_query.exec()?;
/// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into()));
/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into()));
/// }
/// # Result::Ok(())
/// ```
///
/// The implementation based on RocksDB if disabled by default and requires the `"rocksdb"` feature to be activated.
/// A `RocksDbRepository` could be built using `RocksDbRepository::open` and works just like its in-memory equivalent:
/// ```ignore
/// use oxigraph::RocksDbRepository;
/// let dataset = RocksDbRepository::open("example.db").unwrap();
/// let dataset = RocksDbRepository::open("example.db")?;
/// ```
///
/// Quads insertion and deletion should respect [ACID](https://en.wikipedia.org/wiki/ACID) properties for all implementation.
/// No complex transaction support is provided yet.
/// If you want transaction with [ACID](https://en.wikipedia.org/wiki/ACID) properties you could use the `RepositoryConnection.transaction` method.
/// This transaction support is only limited to writes and does not support reads as part of transactions yet.
pub trait Repository {
type Connection: RepositoryConnection;
@ -54,6 +55,7 @@ pub trait Repository {
/// A connection to a `Repository`
pub trait RepositoryConnection: Clone {
type Transaction: RepositoryTransaction;
type PreparedQuery: PreparedQuery;
/// Prepares a [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/) and returns an object that could be used to execute it.
@ -63,23 +65,24 @@ pub trait RepositoryConnection: Clone {
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, MemoryRepository};
/// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result};
/// use oxigraph::sparql::{PreparedQuery, QueryOptions};
/// use oxigraph::sparql::QueryResult;
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection().unwrap();
/// let mut connection = repository.connection()?;
///
/// // insertions
/// let ex = NamedNode::parse("http://example.com").unwrap();
/// let ex = NamedNode::parse("http://example.com")?;
/// connection.insert(&Quad::new(ex.clone(), ex.clone(), ex.clone(), None));
///
/// // SPARQL query
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default()).unwrap();
/// let results = prepared_query.exec().unwrap();
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?;
/// let results = prepared_query.exec()?;
/// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into()));
/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into()));
/// }
/// # Result::Ok(())
/// ```
fn prepare_query(&self, query: &str, options: QueryOptions) -> Result<Self::PreparedQuery>;
@ -98,16 +101,17 @@ pub trait RepositoryConnection: Clone {
/// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result};
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection().unwrap();
/// let mut connection = repository.connection()?;
///
/// // insertion
/// let ex = NamedNode::parse("http://example.com").unwrap();
/// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// connection.insert(&quad);
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results.unwrap());
/// assert_eq!(vec![quad], results?);
/// # Result::Ok(())
/// ```
fn quads_for_pattern<'a>(
&'a self,
@ -119,15 +123,48 @@ pub trait RepositoryConnection: Clone {
where
Self: 'a;
/// Checks if this repository contains a given quad
fn contains(&self, quad: &Quad) -> Result<bool>;
/// Executes a transaction.
///
/// The transaction is executed if the given closure returns `Ok`.
/// Nothing is done if the clusre returns `Err`.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result};
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection()?;
///
/// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
///
/// // transaction
/// connection.transaction(|transaction| {
/// transaction.insert(&quad)
/// });
///
/// // quad filter
/// assert!(connection.contains(&quad).unwrap());
/// # Result::Ok(())
/// ```
fn transaction(&self, f: impl FnOnce(&mut Self::Transaction) -> Result<()>) -> Result<()>;
/// Loads a graph file (i.e. triples) into the repository
///
/// Warning: This functions saves the triples in batch. If the parsing fails in the middle of the file,
/// only a part of it may be written. Use a (memory greedy) transaction if you do not want that.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result, GraphSyntax};
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection().unwrap();
/// let mut connection = repository.connection()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> .";
@ -135,8 +172,9 @@ pub trait RepositoryConnection: Clone {
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com").unwrap();
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), None)], results.unwrap());
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), None)], results?);
/// # Result::Ok(())
/// ```
fn load_graph(
&mut self,
@ -146,7 +184,10 @@ pub trait RepositoryConnection: Clone {
base_iri: Option<&str>,
) -> Result<()>;
/// Loads a dataset file (i.e. quads) into the repository
/// Loads a dataset file (i.e. quads) into the repository.
///
/// Warning: This functions saves the quads in batch. If the parsing fails in the middle of the file,
/// only a part of it may be written. Use a (memory greedy) transaction if you do not want that.
///
/// Usage example:
/// ```
@ -154,7 +195,7 @@ pub trait RepositoryConnection: Clone {
/// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result, DatasetSyntax};
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection().unwrap();
/// let mut connection = repository.connection()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
@ -162,8 +203,9 @@ pub trait RepositoryConnection: Clone {
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com").unwrap();
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), Some(ex.into()))], results.unwrap());
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), Some(ex.into()))], results?);
/// # Result::Ok(())
/// ```
fn load_dataset(
&mut self,
@ -172,12 +214,91 @@ pub trait RepositoryConnection: Clone {
base_iri: Option<&str>,
) -> Result<()>;
/// Checks if this repository contains a given quad
fn contains(&self, quad: &Quad) -> Result<bool>;
/// Adds a quad to this repository.
///
/// If you want to insert a lot of quads at the same time,
/// you should probably use an `auto_transaction`.
///
/// To make a transaction, you could use `transaction`.
fn insert(&mut self, quad: &Quad) -> Result<()>;
/// Removes a quad from this repository.
///
/// If you want to remove a lot of quads at the same time,
/// you should probably use an `auto_transaction`.
///
/// To make a transaction, you could use `transaction`.
fn remove(&mut self, quad: &Quad) -> Result<()>;
}
/// A transaction done on a `RepositoryConnection`
pub trait RepositoryTransaction {
/// Adds quads from a graph file into the transaction insertions.
///
/// Warning: It loads all the files triples into main memory.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result, GraphSyntax};
///
/// let repository = MemoryRepository::default();
/// let connection = repository.connection()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> .";
/// connection.transaction(|transaction|
/// transaction.load_graph(file.as_ref(), GraphSyntax::NTriples, None, None)
/// );
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), None)], results?);
/// # Result::Ok(())
/// ```
fn load_graph(
&mut self,
reader: impl BufRead,
syntax: GraphSyntax,
to_graph_name: Option<&NamedOrBlankNode>,
base_iri: Option<&str>,
) -> Result<()>;
/// Adds quads from a dataset file into the transaction insertions.
///
/// Warning: It loads all the files quads into main memory.
///
/// Usage example:
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result, DatasetSyntax};
///
/// let repository = MemoryRepository::default();
/// let connection = repository.connection()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
/// connection.transaction(|transaction|
/// transaction.load_dataset(file.as_ref(), DatasetSyntax::NQuads, None)
/// );
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// let ex = NamedNode::parse("http://example.com")?;
/// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), Some(ex.into()))], results?);
/// # Result::Ok(())
/// ```
fn load_dataset(
&mut self,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()>;
/// Adds a quad to this repository
/// Adds a quad insertion to this transaction
fn insert(&mut self, quad: &Quad) -> Result<()>;
/// Removes a quad from this repository
/// Adds a quad removals for this transaction
fn remove(&mut self, quad: &Quad) -> Result<()>;
}

@ -17,23 +17,24 @@ use std::sync::{PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard};
/// use oxigraph::sparql::{QueryResult, QueryOptions};
///
/// let repository = MemoryRepository::default();
/// let mut connection = repository.connection().unwrap();
/// let mut connection = repository.connection()?;
///
/// // insertion
/// let ex = NamedNode::parse("http://example.com").unwrap();
/// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// connection.insert(&quad);
/// connection.insert(&quad)?;
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results.unwrap());
/// assert_eq!(vec![quad], results?);
///
/// // SPARQL query
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default()).unwrap();
/// let results = prepared_query.exec().unwrap();
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?;
/// let results = prepared_query.exec()?;
/// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into()));
/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into()));
/// }
/// # Result::Ok(())
/// ```
#[derive(Default)]
pub struct MemoryRepository {
@ -44,7 +45,6 @@ pub type MemoryRepositoryConnection<'a> = StoreRepositoryConnection<&'a MemorySt
type TripleMap<T> = BTreeMap<T, BTreeMap<T, BTreeSet<T>>>;
type QuadMap<T> = BTreeMap<T, TripleMap<T>>;
#[derive(Default)]
pub struct MemoryStore {
indexes: RwLock<MemoryStoreIndexes>,
}
@ -57,7 +57,21 @@ struct MemoryStoreIndexes {
gspo: QuadMap<EncodedTerm>,
gpos: QuadMap<EncodedTerm>,
gosp: QuadMap<EncodedTerm>,
str_store: MemoryStrStore,
id2str: BTreeMap<u128, String>,
}
impl Default for MemoryStore {
fn default() -> Self {
let new = Self {
indexes: RwLock::default(),
};
let mut transaction = (&new).connection().unwrap().transaction();
transaction.set_first_strings().unwrap();
transaction.commit().unwrap();
new
}
}
impl<'a> Repository for &'a MemoryRepository {
@ -78,21 +92,35 @@ impl<'a> Store for &'a MemoryStore {
impl<'a> StrLookup for &'a MemoryStore {
fn get_str(&self, id: u128) -> Result<Option<String>> {
self.indexes()?.str_store.get_str(id)
//TODO: avoid copy by adding a lifetime limit to get_str
Ok(self.indexes()?.id2str.get(&id).cloned())
}
}
impl<'a> StrContainer for &'a MemoryStore {
fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
self.indexes_mut()?.str_store.insert_str(key, value)
self.indexes_mut()?
.id2str
.entry(key)
.or_insert_with(|| value.to_owned());
Ok(())
}
}
impl<'a> StoreConnection for &'a MemoryStore {
type Transaction = &'a MemoryStore;
type Transaction = MemoryTransaction<'a>;
type AutoTransaction = &'a MemoryStore;
fn transaction(&self) -> MemoryTransaction<'a> {
MemoryTransaction {
store: self,
ops: Vec::default(),
strings: Vec::default(),
}
}
fn transaction(&self) -> Result<&'a MemoryStore> {
Ok(self)
fn auto_transaction(&self) -> &'a MemoryStore {
self
}
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
@ -185,99 +213,14 @@ impl<'a> StoreConnection for &'a MemoryStore {
}
}
/// TODO: implement properly
impl<'a> StoreTransaction for &'a MemoryStore {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
let mut quad_indexes = self.indexes_mut()?;
insert_into_quad_map(
&mut quad_indexes.gosp,
quad.graph_name,
quad.object,
quad.subject,
quad.predicate,
);
insert_into_quad_map(
&mut quad_indexes.gpos,
quad.graph_name,
quad.predicate,
quad.object,
quad.subject,
);
insert_into_quad_map(
&mut quad_indexes.gspo,
quad.graph_name,
quad.subject,
quad.predicate,
quad.object,
);
insert_into_quad_map(
&mut quad_indexes.ospg,
quad.object,
quad.subject,
quad.predicate,
quad.graph_name,
);
insert_into_quad_map(
&mut quad_indexes.posg,
quad.predicate,
quad.object,
quad.subject,
quad.graph_name,
);
insert_into_quad_map(
&mut quad_indexes.spog,
quad.subject,
quad.predicate,
quad.object,
quad.graph_name,
);
self.indexes_mut()?.insert_quad(quad);
Ok(())
}
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> {
let mut quad_indexes = self.indexes_mut()?;
remove_from_quad_map(
&mut quad_indexes.gosp,
&quad.graph_name,
&quad.object,
&quad.subject,
&quad.predicate,
);
remove_from_quad_map(
&mut quad_indexes.gpos,
&quad.graph_name,
&quad.predicate,
&quad.object,
&quad.subject,
);
remove_from_quad_map(
&mut quad_indexes.gspo,
&quad.graph_name,
&quad.subject,
&quad.predicate,
&quad.object,
);
remove_from_quad_map(
&mut quad_indexes.ospg,
&quad.object,
&quad.subject,
&quad.predicate,
&quad.graph_name,
);
remove_from_quad_map(
&mut quad_indexes.posg,
&quad.predicate,
&quad.object,
&quad.subject,
&quad.graph_name,
);
remove_from_quad_map(
&mut quad_indexes.spog,
&quad.subject,
&quad.predicate,
&quad.object,
&quad.graph_name,
);
self.indexes_mut()?.remove_quad(quad);
Ok(())
}
@ -517,6 +460,98 @@ impl MemoryStore {
}
}
impl MemoryStoreIndexes {
fn insert_quad(&mut self, quad: &EncodedQuad) {
insert_into_quad_map(
&mut self.gosp,
quad.graph_name,
quad.object,
quad.subject,
quad.predicate,
);
insert_into_quad_map(
&mut self.gpos,
quad.graph_name,
quad.predicate,
quad.object,
quad.subject,
);
insert_into_quad_map(
&mut self.gspo,
quad.graph_name,
quad.subject,
quad.predicate,
quad.object,
);
insert_into_quad_map(
&mut self.ospg,
quad.object,
quad.subject,
quad.predicate,
quad.graph_name,
);
insert_into_quad_map(
&mut self.posg,
quad.predicate,
quad.object,
quad.subject,
quad.graph_name,
);
insert_into_quad_map(
&mut self.spog,
quad.subject,
quad.predicate,
quad.object,
quad.graph_name,
);
}
fn remove_quad(&mut self, quad: &EncodedQuad) {
remove_from_quad_map(
&mut self.gosp,
&quad.graph_name,
&quad.object,
&quad.subject,
&quad.predicate,
);
remove_from_quad_map(
&mut self.gpos,
&quad.graph_name,
&quad.predicate,
&quad.object,
&quad.subject,
);
remove_from_quad_map(
&mut self.gspo,
&quad.graph_name,
&quad.subject,
&quad.predicate,
&quad.object,
);
remove_from_quad_map(
&mut self.ospg,
&quad.object,
&quad.subject,
&quad.predicate,
&quad.graph_name,
);
remove_from_quad_map(
&mut self.posg,
&quad.predicate,
&quad.object,
&quad.subject,
&quad.graph_name,
);
remove_from_quad_map(
&mut self.spog,
&quad.subject,
&quad.predicate,
&quad.object,
&quad.graph_name,
);
}
}
fn wrap_error<'a, E: 'static, I: Iterator<Item = Result<E>> + 'a>(
iter: Result<I>,
) -> Box<dyn Iterator<Item = Result<E>> + 'a> {
@ -603,6 +638,48 @@ fn quad_map_flatten<'a, T: Copy>(gspo: &'a QuadMap<T>) -> impl Iterator<Item = (
})
}
pub struct MemoryTransaction<'a> {
store: &'a MemoryStore,
ops: Vec<TransactionOp>,
strings: Vec<(u128, String)>,
}
enum TransactionOp {
Insert(EncodedQuad),
Delete(EncodedQuad),
}
impl StrContainer for MemoryTransaction<'_> {
fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
self.strings.push((key, value.to_owned()));
Ok(())
}
}
impl StoreTransaction for MemoryTransaction<'_> {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
self.ops.push(TransactionOp::Insert(quad.clone()));
Ok(())
}
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> {
self.ops.push(TransactionOp::Delete(quad.clone()));
Ok(())
}
fn commit(self) -> Result<()> {
let mut indexes = self.store.indexes_mut()?;
indexes.id2str.extend(self.strings);
for op in self.ops {
match op {
TransactionOp::Insert(quad) => indexes.insert_quad(&quad),
TransactionOp::Delete(quad) => indexes.remove_quad(&quad),
}
}
Ok(())
}
}
#[derive(Debug, Fail)]
#[fail(display = "Mutex Mutex was poisoned")]
pub struct MutexPoisonError {

@ -11,6 +11,7 @@ pub use crate::store::memory::MemoryRepository;
pub use crate::store::rocksdb::RocksDbRepository;
use crate::model::*;
use crate::repository::RepositoryTransaction;
use crate::sparql::{QueryOptions, SimplePreparedQuery};
use crate::store::numeric_encoder::*;
use crate::{DatasetSyntax, GraphSyntax, RepositoryConnection, Result};
@ -31,11 +32,11 @@ pub trait Store {
/// A connection to a `Store`
pub trait StoreConnection: StrLookup + Sized + Clone {
type Transaction: StoreTransaction;
type AutoTransaction: StoreTransaction;
/// Creates an edition transaction
/// TODO: current transaction implementations could commit before the call to commit()
/// It's why this API is not exposed publicly yet
fn transaction(&self) -> Result<Self::Transaction>;
fn transaction(&self) -> Self::Transaction;
fn auto_transaction(&self) -> Self::AutoTransaction;
fn contains(&self, quad: &EncodedQuad) -> Result<bool>;
@ -69,13 +70,32 @@ impl<S: StoreConnection> From<S> for StoreRepositoryConnection<S> {
}
}
impl<S: StoreConnection> StoreRepositoryConnection<S> {
#[must_use]
fn auto_transaction(&self) -> StoreRepositoryTransaction<S::AutoTransaction> {
StoreRepositoryTransaction {
inner: self.inner.auto_transaction(),
}
}
}
impl<S: StoreConnection> RepositoryConnection for StoreRepositoryConnection<S> {
type Transaction = StoreRepositoryTransaction<S::Transaction>;
type PreparedQuery = SimplePreparedQuery<S>;
fn prepare_query(&self, query: &str, options: QueryOptions) -> Result<SimplePreparedQuery<S>> {
SimplePreparedQuery::new(self.inner.clone(), query, options) //TODO: avoid clone
}
fn prepare_query_from_pattern(
&self,
pattern: &GraphPattern,
options: QueryOptions,
) -> Result<Self::PreparedQuery> {
SimplePreparedQuery::new_from_pattern(self.inner.clone(), pattern, options)
//TODO: avoid clone
}
fn quads_for_pattern<'a>(
&'a self,
subject: Option<&NamedOrBlankNode>,
@ -97,15 +117,60 @@ impl<S: StoreConnection> RepositoryConnection for StoreRepositoryConnection<S> {
)
}
fn prepare_query_from_pattern(
&self,
pattern: &GraphPattern,
options: QueryOptions,
) -> Result<Self::PreparedQuery> {
SimplePreparedQuery::new_from_pattern(self.inner.clone(), pattern, options)
//TODO: avoid clone
fn contains(&self, quad: &Quad) -> Result<bool> {
self.inner.contains(&quad.into())
}
#[must_use]
fn transaction(&self, f: impl FnOnce(&mut Self::Transaction) -> Result<()>) -> Result<()> {
let mut transaction = StoreRepositoryTransaction {
inner: self.inner.transaction(),
};
f(&mut transaction)?;
transaction.inner.commit()
}
fn load_graph(
&mut self,
reader: impl BufRead,
syntax: GraphSyntax,
to_graph_name: Option<&NamedOrBlankNode>,
base_iri: Option<&str>,
) -> Result<()> {
let mut transaction = self.auto_transaction();
transaction.load_graph(reader, syntax, to_graph_name, base_iri)?;
transaction.inner.commit()
}
fn load_dataset(
&mut self,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()> {
let mut transaction = self.auto_transaction();
transaction.load_dataset(reader, syntax, base_iri)?;
transaction.inner.commit()
}
fn insert(&mut self, quad: &Quad) -> Result<()> {
let mut transaction = self.auto_transaction();
transaction.insert(&quad)?;
transaction.inner.commit()
}
fn remove(&mut self, quad: &Quad) -> Result<()> {
let mut transaction = self.auto_transaction();
transaction.remove(&quad)?;
transaction.inner.commit()
}
}
pub struct StoreRepositoryTransaction<T: StoreTransaction> {
inner: T,
}
impl<T: StoreTransaction> RepositoryTransaction for StoreRepositoryTransaction<T> {
fn load_graph(
&mut self,
reader: impl BufRead,
@ -140,26 +205,18 @@ impl<S: StoreConnection> RepositoryConnection for StoreRepositoryConnection<S> {
}
}
fn contains(&self, quad: &Quad) -> Result<bool> {
self.inner.contains(&quad.into())
}
fn insert(&mut self, quad: &Quad) -> Result<()> {
let mut transaction = self.inner.transaction()?;
let quad = transaction.encode_quad(quad)?;
transaction.insert(&quad)?;
transaction.commit()
let quad = self.inner.encode_quad(quad)?;
self.inner.insert(&quad)
}
fn remove(&mut self, quad: &Quad) -> Result<()> {
let mut transaction = self.inner.transaction()?;
let quad = transaction.encode_quad(quad)?;
transaction.remove(&quad)?;
transaction.commit()
let quad = quad.into();
self.inner.remove(&quad)
}
}
impl<S: StoreConnection> StoreRepositoryConnection<S> {
impl<T: StoreTransaction> StoreRepositoryTransaction<T> {
fn load_from_triple_parser<P: TriplesParser>(
&mut self,
mut parser: P,
@ -168,32 +225,28 @@ impl<S: StoreConnection> StoreRepositoryConnection<S> {
where
P::Error: Send + Sync + 'static,
{
let mut transaction = self.inner.transaction()?;
let mut bnode_map = HashMap::default();
let graph_name = if let Some(graph_name) = to_graph_name {
transaction.encode_named_or_blank_node(graph_name)?
self.inner.encode_named_or_blank_node(graph_name)?
} else {
EncodedTerm::DefaultGraph
};
let tr = &mut transaction;
parser.parse_all(&mut move |t| {
let quad = tr.encode_rio_triple_in_graph(t, graph_name, &mut bnode_map)?;
tr.insert(&quad)
})?;
transaction.commit() //TODO: partials commits
let quad = self
.inner
.encode_rio_triple_in_graph(t, graph_name, &mut bnode_map)?;
self.inner.insert(&quad)
})
}
fn load_from_quad_parser<P: QuadsParser>(&mut self, mut parser: P) -> Result<()>
where
P::Error: Send + Sync + 'static,
{
let mut transaction = self.inner.transaction()?;
let mut bnode_map = HashMap::default();
let tr = &mut transaction;
parser.parse_all(&mut move |q| {
let quad = tr.encode_rio_quad(q, &mut bnode_map)?;
tr.insert(&quad)
})?;
transaction.commit() //TODO: partials commits
let quad = self.inner.encode_rio_quad(q, &mut bnode_map)?;
self.inner.insert(&quad)
})
}
}

@ -14,30 +14,36 @@ use std::str;
/// To use it, the `"rocksdb"` feature need to be activated.
///
/// Usage example:
/// ```ignored
/// ```
/// use oxigraph::model::*;
/// use oxigraph::{Repository, RepositoryConnection, RocksDbRepository, Result};
/// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, RocksDbRepository, Result};
/// use crate::oxigraph::sparql::{PreparedQuery, QueryOptions};
/// use oxigraph::sparql::QueryResult;
/// # use std::fs::remove_dir_all;
///
/// let repository = RocksDbRepository::open("example.db").unwrap();
/// let mut connection = repository.connection().unwrap();
/// # {
/// let repository = RocksDbRepository::open("example.db")?;
/// let mut connection = repository.connection()?;
///
/// // insertion
/// let ex = NamedNode::parse("http://example.com").unwrap();
/// let ex = NamedNode::parse("http://example.com")?;
/// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None);
/// connection.insert(&quad);
/// connection.insert(&quad)?;
///
/// // quad filter
/// let results: Result<Vec<Quad>> = connection.quads_for_pattern(None, None, None, None).collect();
/// assert_eq!(vec![quad], results.unwrap());
/// assert_eq!(vec![quad], results?);
///
/// // SPARQL query
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default()).unwrap();
/// let results = prepared_query.exec().unwrap();
/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?;
/// let results = prepared_query.exec()?;
/// if let QueryResult::Bindings(results) = results {
/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into()));
/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into()));
/// }
/// #
/// # }
/// # remove_dir_all("example.db")?;
/// # Result::Ok(())
/// ```
pub struct RocksDbRepository {
inner: RocksDbStore,
@ -106,7 +112,7 @@ impl RocksDbStore {
db: DB::open_cf(&options, path, &COLUMN_FAMILIES)?,
};
let mut transaction = (&new).connection()?.transaction()?;
let mut transaction = (&new).connection()?.transaction();
transaction.set_first_strings()?;
transaction.commit()?;
@ -143,13 +149,26 @@ impl StrLookup for RocksDbStoreConnection<'_> {
impl<'a> StoreConnection for RocksDbStoreConnection<'a> {
type Transaction = RocksDbStoreTransaction<'a>;
type AutoTransaction = RocksDbStoreAutoTransaction<'a>;
fn transaction(&self) -> RocksDbStoreTransaction<'a> {
RocksDbStoreTransaction {
inner: RocksDbStoreInnerTransaction {
connection: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
},
}
}
fn transaction(&self) -> Result<RocksDbStoreTransaction<'a>> {
Ok(RocksDbStoreTransaction {
connection: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
})
fn auto_transaction(&self) -> RocksDbStoreAutoTransaction<'a> {
RocksDbStoreAutoTransaction {
inner: RocksDbStoreInnerTransaction {
connection: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
},
}
}
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
@ -427,20 +446,81 @@ impl<'a> RocksDbStoreConnection<'a> {
}
pub struct RocksDbStoreTransaction<'a> {
inner: RocksDbStoreInnerTransaction<'a>,
}
impl StrContainer for RocksDbStoreTransaction<'_> {
fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
self.inner.insert_str(key, value)
}
}
impl StoreTransaction for RocksDbStoreTransaction<'_> {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
self.inner.insert(quad)
}
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> {
self.inner.remove(quad)
}
fn commit(self) -> Result<()> {
self.inner.commit()
}
}
pub struct RocksDbStoreAutoTransaction<'a> {
inner: RocksDbStoreInnerTransaction<'a>,
}
impl StrContainer for RocksDbStoreAutoTransaction<'_> {
fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
self.inner.insert_str(key, value)
}
}
impl StoreTransaction for RocksDbStoreAutoTransaction<'_> {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
self.inner.insert(quad)?;
self.commit_if_big()
}
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> {
self.inner.remove(quad)?;
self.commit_if_big()
}
fn commit(self) -> Result<()> {
self.inner.commit()
}
}
impl RocksDbStoreAutoTransaction<'_> {
fn commit_if_big(&mut self) -> Result<()> {
if self.inner.batch.len() > MAX_TRANSACTION_SIZE {
self.inner
.connection
.store
.db
.write(replace(&mut self.inner.batch, WriteBatch::default()))?;
}
Ok(())
}
}
struct RocksDbStoreInnerTransaction<'a> {
connection: RocksDbStoreConnection<'a>,
batch: WriteBatch,
buffer: Vec<u8>,
}
impl StrContainer for RocksDbStoreTransaction<'_> {
impl RocksDbStoreInnerTransaction<'_> {
fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
self.batch
.put_cf(self.connection.id2str_cf, &key.to_le_bytes(), value)?;
Ok(())
}
}
impl<'a> StoreTransaction for RocksDbStoreTransaction<'a> {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
self.buffer.write_spog_quad(quad)?;
self.batch
@ -472,13 +552,6 @@ impl<'a> StoreTransaction for RocksDbStoreTransaction<'a> {
.put_cf(self.connection.gosp_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
if self.batch.len() > MAX_TRANSACTION_SIZE {
self.connection
.store
.db
.write(replace(&mut self.batch, WriteBatch::default()))?;
}
Ok(())
}
@ -513,13 +586,6 @@ impl<'a> StoreTransaction for RocksDbStoreTransaction<'a> {
.delete_cf(self.connection.gosp_cf, &self.buffer)?;
self.buffer.clear();
if self.batch.len() > MAX_TRANSACTION_SIZE {
self.connection
.store
.db
.write(replace(&mut self.batch, WriteBatch::default()))?;
}
Ok(())
}
@ -593,7 +659,7 @@ impl<'a, F: Fn(&[u8]) -> Result<EncodedQuad>> Iterator for DecodingIndexIterator
#[test]
fn repository() -> Result<()> {
use crate::model::*;
use crate::repository::RepositoryConnection;
use crate::*;
use rand::random;
use std::env::temp_dir;
use std::fs::remove_dir_all;

@ -1,7 +1,7 @@
use crate::SERVER;
use chrono::{DateTime, Utc};
use oxigraph::model::NamedNode;
use oxigraph::{GraphSyntax, Repository, RepositoryConnection, Result};
use oxigraph::*;
use reqwest::header::USER_AGENT;
use reqwest::{Client, Url};
use serde_json::Value;
@ -192,21 +192,22 @@ impl<R: Repository + Copy> WikibaseLoader<R> {
}
fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> {
let mut connection = self.repository.connection()?;
let connection = self.repository.connection()?;
let graph_name = NamedNode::parse(uri)?.into();
connection.transaction(|transaction| {
let to_remove = connection
.quads_for_pattern(None, None, None, Some(Some(&graph_name)))
.collect::<Result<Vec<_>>>()?;
for q in to_remove {
transaction.remove(&q)?;
}
let to_remove = connection
.quads_for_pattern(None, None, None, Some(Some(&graph_name)))
.collect::<Result<Vec<_>>>()?;
for q in to_remove {
connection.remove(&q)?;
}
connection.load_graph(
BufReader::new(data),
GraphSyntax::NTriples,
Some(&NamedNode::parse(uri)?.into()),
None,
)
transaction.load_graph(
BufReader::new(data),
GraphSyntax::NTriples,
Some(&NamedNode::parse(uri)?.into()),
None,
)
})
}
}

Loading…
Cancel
Save