From 62e3d149841162e925381eecd04af0e273b09619 Mon Sep 17 00:00:00 2001 From: Tpt Date: Tue, 29 Oct 2019 21:35:33 +0100 Subject: [PATCH] Adds proper write transaction support --- lib/src/lib.rs | 18 +-- lib/src/repository.rs | 179 +++++++++++++++++++++---- lib/src/store/memory.rs | 279 +++++++++++++++++++++++++-------------- lib/src/store/mod.rs | 127 ++++++++++++------ lib/src/store/rocksdb.rs | 136 ++++++++++++++----- wikibase/src/loader.rs | 31 ++--- 6 files changed, 545 insertions(+), 225 deletions(-) diff --git a/lib/src/lib.rs b/lib/src/lib.rs index fd1a86dd..5893e495 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -10,28 +10,29 @@ //! //! ``` //! use oxigraph::model::*; -//! use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result}; +//! use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result}; //! use crate::oxigraph::sparql::{PreparedQuery, QueryOptions}; //! use oxigraph::sparql::QueryResult; //! //! let repository = MemoryRepository::default(); -//! let mut connection = repository.connection().unwrap(); +//! let mut connection = repository.connection()?; //! //! // insertion -//! let ex = NamedNode::parse("http://example.com").unwrap(); +//! let ex = NamedNode::parse("http://example.com")?; //! let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); -//! connection.insert(&quad); +//! connection.insert(&quad)?; //! //! // quad filter //! let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); -//! assert_eq!(vec![quad], results.unwrap()); +//! assert_eq!(vec![quad], results?); //! //! // SPARQL query -//! let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default()).unwrap(); -//! let results = prepared_query.exec().unwrap(); +//! let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?; +//! let results = prepared_query.exec()?; //! if let QueryResult::Bindings(results) = results { -//! assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into())); +//! assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into())); //! } +//! # Result::Ok(()) //! ``` pub mod model; @@ -44,6 +45,7 @@ pub use failure::Error; pub type Result = ::std::result::Result; pub use crate::repository::Repository; pub use crate::repository::RepositoryConnection; +pub use crate::repository::RepositoryTransaction; pub use crate::store::MemoryRepository; #[cfg(feature = "rocksdb")] pub use crate::store::RocksDbRepository; diff --git a/lib/src/repository.rs b/lib/src/repository.rs index f8ab40d0..8f31eba1 100644 --- a/lib/src/repository.rs +++ b/lib/src/repository.rs @@ -18,34 +18,35 @@ use std::io::BufRead; /// use oxigraph::sparql::QueryResult; /// /// let repository = MemoryRepository::default(); -/// let mut connection = repository.connection().unwrap(); +/// let mut connection = repository.connection()?; /// /// // insertion -/// let ex = NamedNode::parse("http://example.com").unwrap(); +/// let ex = NamedNode::parse("http://example.com")?; /// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); /// connection.insert(&quad); /// /// // quad filter /// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); -/// assert_eq!(vec![quad], results.unwrap()); +/// assert_eq!(vec![quad], results?); /// /// // SPARQL query -/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default()).unwrap(); -/// let results = prepared_query.exec().unwrap(); +/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?; +/// let results = prepared_query.exec()?; /// if let QueryResult::Bindings(results) = results { -/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into())); +/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into())); /// } +/// # Result::Ok(()) /// ``` /// /// The implementation based on RocksDB if disabled by default and requires the `"rocksdb"` feature to be activated. /// A `RocksDbRepository` could be built using `RocksDbRepository::open` and works just like its in-memory equivalent: /// ```ignore /// use oxigraph::RocksDbRepository; -/// let dataset = RocksDbRepository::open("example.db").unwrap(); +/// let dataset = RocksDbRepository::open("example.db")?; /// ``` /// -/// Quads insertion and deletion should respect [ACID](https://en.wikipedia.org/wiki/ACID) properties for all implementation. -/// No complex transaction support is provided yet. +/// If you want transaction with [ACID](https://en.wikipedia.org/wiki/ACID) properties you could use the `RepositoryConnection.transaction` method. +/// This transaction support is only limited to writes and does not support reads as part of transactions yet. pub trait Repository { type Connection: RepositoryConnection; @@ -54,6 +55,7 @@ pub trait Repository { /// A connection to a `Repository` pub trait RepositoryConnection: Clone { + type Transaction: RepositoryTransaction; type PreparedQuery: PreparedQuery; /// Prepares a [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/) and returns an object that could be used to execute it. @@ -63,23 +65,24 @@ pub trait RepositoryConnection: Clone { /// Usage example: /// ``` /// use oxigraph::model::*; - /// use oxigraph::{Repository, RepositoryConnection, MemoryRepository}; + /// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result}; /// use oxigraph::sparql::{PreparedQuery, QueryOptions}; /// use oxigraph::sparql::QueryResult; /// /// let repository = MemoryRepository::default(); - /// let mut connection = repository.connection().unwrap(); + /// let mut connection = repository.connection()?; /// /// // insertions - /// let ex = NamedNode::parse("http://example.com").unwrap(); + /// let ex = NamedNode::parse("http://example.com")?; /// connection.insert(&Quad::new(ex.clone(), ex.clone(), ex.clone(), None)); /// /// // SPARQL query - /// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default()).unwrap(); - /// let results = prepared_query.exec().unwrap(); + /// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?; + /// let results = prepared_query.exec()?; /// if let QueryResult::Bindings(results) = results { - /// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into())); + /// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into())); /// } + /// # Result::Ok(()) /// ``` fn prepare_query(&self, query: &str, options: QueryOptions) -> Result; @@ -98,16 +101,17 @@ pub trait RepositoryConnection: Clone { /// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result}; /// /// let repository = MemoryRepository::default(); - /// let mut connection = repository.connection().unwrap(); + /// let mut connection = repository.connection()?; /// /// // insertion - /// let ex = NamedNode::parse("http://example.com").unwrap(); + /// let ex = NamedNode::parse("http://example.com")?; /// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); /// connection.insert(&quad); /// /// // quad filter /// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); - /// assert_eq!(vec![quad], results.unwrap()); + /// assert_eq!(vec![quad], results?); + /// # Result::Ok(()) /// ``` fn quads_for_pattern<'a>( &'a self, @@ -119,15 +123,48 @@ pub trait RepositoryConnection: Clone { where Self: 'a; + /// Checks if this repository contains a given quad + fn contains(&self, quad: &Quad) -> Result; + + /// Executes a transaction. + /// + /// The transaction is executed if the given closure returns `Ok`. + /// Nothing is done if the clusre returns `Err`. + /// + /// Usage example: + /// ``` + /// use oxigraph::model::*; + /// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result}; + /// + /// let repository = MemoryRepository::default(); + /// let mut connection = repository.connection()?; + /// + /// let ex = NamedNode::parse("http://example.com")?; + /// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); + /// + /// // transaction + /// connection.transaction(|transaction| { + /// transaction.insert(&quad) + /// }); + /// + /// // quad filter + /// assert!(connection.contains(&quad).unwrap()); + /// # Result::Ok(()) + /// ``` + fn transaction(&self, f: impl FnOnce(&mut Self::Transaction) -> Result<()>) -> Result<()>; + /// Loads a graph file (i.e. triples) into the repository /// + /// Warning: This functions saves the triples in batch. If the parsing fails in the middle of the file, + /// only a part of it may be written. Use a (memory greedy) transaction if you do not want that. + /// /// Usage example: /// ``` /// use oxigraph::model::*; /// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result, GraphSyntax}; /// /// let repository = MemoryRepository::default(); - /// let mut connection = repository.connection().unwrap(); + /// let mut connection = repository.connection()?; /// /// // insertion /// let file = b" ."; @@ -135,8 +172,9 @@ pub trait RepositoryConnection: Clone { /// /// // quad filter /// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); - /// let ex = NamedNode::parse("http://example.com").unwrap(); - /// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), None)], results.unwrap()); + /// let ex = NamedNode::parse("http://example.com")?; + /// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), None)], results?); + /// # Result::Ok(()) /// ``` fn load_graph( &mut self, @@ -146,7 +184,10 @@ pub trait RepositoryConnection: Clone { base_iri: Option<&str>, ) -> Result<()>; - /// Loads a dataset file (i.e. quads) into the repository + /// Loads a dataset file (i.e. quads) into the repository. + /// + /// Warning: This functions saves the quads in batch. If the parsing fails in the middle of the file, + /// only a part of it may be written. Use a (memory greedy) transaction if you do not want that. /// /// Usage example: /// ``` @@ -154,7 +195,7 @@ pub trait RepositoryConnection: Clone { /// use oxigraph::{Repository, RepositoryConnection, MemoryRepository, Result, DatasetSyntax}; /// /// let repository = MemoryRepository::default(); - /// let mut connection = repository.connection().unwrap(); + /// let mut connection = repository.connection()?; /// /// // insertion /// let file = b" ."; @@ -162,8 +203,9 @@ pub trait RepositoryConnection: Clone { /// /// // quad filter /// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); - /// let ex = NamedNode::parse("http://example.com").unwrap(); - /// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), Some(ex.into()))], results.unwrap()); + /// let ex = NamedNode::parse("http://example.com")?; + /// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), Some(ex.into()))], results?); + /// # Result::Ok(()) /// ``` fn load_dataset( &mut self, @@ -172,12 +214,91 @@ pub trait RepositoryConnection: Clone { base_iri: Option<&str>, ) -> Result<()>; - /// Checks if this repository contains a given quad - fn contains(&self, quad: &Quad) -> Result; + /// Adds a quad to this repository. + /// + /// If you want to insert a lot of quads at the same time, + /// you should probably use an `auto_transaction`. + /// + /// To make a transaction, you could use `transaction`. + fn insert(&mut self, quad: &Quad) -> Result<()>; + + /// Removes a quad from this repository. + /// + /// If you want to remove a lot of quads at the same time, + /// you should probably use an `auto_transaction`. + /// + /// To make a transaction, you could use `transaction`. + fn remove(&mut self, quad: &Quad) -> Result<()>; +} + +/// A transaction done on a `RepositoryConnection` +pub trait RepositoryTransaction { + /// Adds quads from a graph file into the transaction insertions. + /// + /// Warning: It loads all the files triples into main memory. + /// + /// Usage example: + /// ``` + /// use oxigraph::model::*; + /// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result, GraphSyntax}; + /// + /// let repository = MemoryRepository::default(); + /// let connection = repository.connection()?; + /// + /// // insertion + /// let file = b" ."; + /// connection.transaction(|transaction| + /// transaction.load_graph(file.as_ref(), GraphSyntax::NTriples, None, None) + /// ); + /// + /// // quad filter + /// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); + /// let ex = NamedNode::parse("http://example.com")?; + /// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), None)], results?); + /// # Result::Ok(()) + /// ``` + fn load_graph( + &mut self, + reader: impl BufRead, + syntax: GraphSyntax, + to_graph_name: Option<&NamedOrBlankNode>, + base_iri: Option<&str>, + ) -> Result<()>; + + /// Adds quads from a dataset file into the transaction insertions. + /// + /// Warning: It loads all the files quads into main memory. + /// + /// Usage example: + /// ``` + /// use oxigraph::model::*; + /// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, MemoryRepository, Result, DatasetSyntax}; + /// + /// let repository = MemoryRepository::default(); + /// let connection = repository.connection()?; + /// + /// // insertion + /// let file = b" ."; + /// connection.transaction(|transaction| + /// transaction.load_dataset(file.as_ref(), DatasetSyntax::NQuads, None) + /// ); + /// + /// // quad filter + /// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); + /// let ex = NamedNode::parse("http://example.com")?; + /// assert_eq!(vec![Quad::new(ex.clone(), ex.clone(), ex.clone(), Some(ex.into()))], results?); + /// # Result::Ok(()) + /// ``` + fn load_dataset( + &mut self, + reader: impl BufRead, + syntax: DatasetSyntax, + base_iri: Option<&str>, + ) -> Result<()>; - /// Adds a quad to this repository + /// Adds a quad insertion to this transaction fn insert(&mut self, quad: &Quad) -> Result<()>; - /// Removes a quad from this repository + /// Adds a quad removals for this transaction fn remove(&mut self, quad: &Quad) -> Result<()>; } diff --git a/lib/src/store/memory.rs b/lib/src/store/memory.rs index a950672c..e3ab2688 100644 --- a/lib/src/store/memory.rs +++ b/lib/src/store/memory.rs @@ -17,23 +17,24 @@ use std::sync::{PoisonError, RwLock, RwLockReadGuard, RwLockWriteGuard}; /// use oxigraph::sparql::{QueryResult, QueryOptions}; /// /// let repository = MemoryRepository::default(); -/// let mut connection = repository.connection().unwrap(); +/// let mut connection = repository.connection()?; /// /// // insertion -/// let ex = NamedNode::parse("http://example.com").unwrap(); +/// let ex = NamedNode::parse("http://example.com")?; /// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); -/// connection.insert(&quad); +/// connection.insert(&quad)?; /// /// // quad filter /// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); -/// assert_eq!(vec![quad], results.unwrap()); +/// assert_eq!(vec![quad], results?); /// /// // SPARQL query -/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default()).unwrap(); -/// let results = prepared_query.exec().unwrap(); +/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?; +/// let results = prepared_query.exec()?; /// if let QueryResult::Bindings(results) = results { -/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into())); +/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into())); /// } +/// # Result::Ok(()) /// ``` #[derive(Default)] pub struct MemoryRepository { @@ -44,7 +45,6 @@ pub type MemoryRepositoryConnection<'a> = StoreRepositoryConnection<&'a MemorySt type TripleMap = BTreeMap>>; type QuadMap = BTreeMap>; -#[derive(Default)] pub struct MemoryStore { indexes: RwLock, } @@ -57,7 +57,21 @@ struct MemoryStoreIndexes { gspo: QuadMap, gpos: QuadMap, gosp: QuadMap, - str_store: MemoryStrStore, + id2str: BTreeMap, +} + +impl Default for MemoryStore { + fn default() -> Self { + let new = Self { + indexes: RwLock::default(), + }; + + let mut transaction = (&new).connection().unwrap().transaction(); + transaction.set_first_strings().unwrap(); + transaction.commit().unwrap(); + + new + } } impl<'a> Repository for &'a MemoryRepository { @@ -78,21 +92,35 @@ impl<'a> Store for &'a MemoryStore { impl<'a> StrLookup for &'a MemoryStore { fn get_str(&self, id: u128) -> Result> { - self.indexes()?.str_store.get_str(id) + //TODO: avoid copy by adding a lifetime limit to get_str + Ok(self.indexes()?.id2str.get(&id).cloned()) } } impl<'a> StrContainer for &'a MemoryStore { fn insert_str(&mut self, key: u128, value: &str) -> Result<()> { - self.indexes_mut()?.str_store.insert_str(key, value) + self.indexes_mut()? + .id2str + .entry(key) + .or_insert_with(|| value.to_owned()); + Ok(()) } } impl<'a> StoreConnection for &'a MemoryStore { - type Transaction = &'a MemoryStore; + type Transaction = MemoryTransaction<'a>; + type AutoTransaction = &'a MemoryStore; + + fn transaction(&self) -> MemoryTransaction<'a> { + MemoryTransaction { + store: self, + ops: Vec::default(), + strings: Vec::default(), + } + } - fn transaction(&self) -> Result<&'a MemoryStore> { - Ok(self) + fn auto_transaction(&self) -> &'a MemoryStore { + self } fn contains(&self, quad: &EncodedQuad) -> Result { @@ -185,99 +213,14 @@ impl<'a> StoreConnection for &'a MemoryStore { } } -/// TODO: implement properly impl<'a> StoreTransaction for &'a MemoryStore { fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { - let mut quad_indexes = self.indexes_mut()?; - insert_into_quad_map( - &mut quad_indexes.gosp, - quad.graph_name, - quad.object, - quad.subject, - quad.predicate, - ); - insert_into_quad_map( - &mut quad_indexes.gpos, - quad.graph_name, - quad.predicate, - quad.object, - quad.subject, - ); - insert_into_quad_map( - &mut quad_indexes.gspo, - quad.graph_name, - quad.subject, - quad.predicate, - quad.object, - ); - insert_into_quad_map( - &mut quad_indexes.ospg, - quad.object, - quad.subject, - quad.predicate, - quad.graph_name, - ); - insert_into_quad_map( - &mut quad_indexes.posg, - quad.predicate, - quad.object, - quad.subject, - quad.graph_name, - ); - insert_into_quad_map( - &mut quad_indexes.spog, - quad.subject, - quad.predicate, - quad.object, - quad.graph_name, - ); + self.indexes_mut()?.insert_quad(quad); Ok(()) } fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { - let mut quad_indexes = self.indexes_mut()?; - remove_from_quad_map( - &mut quad_indexes.gosp, - &quad.graph_name, - &quad.object, - &quad.subject, - &quad.predicate, - ); - remove_from_quad_map( - &mut quad_indexes.gpos, - &quad.graph_name, - &quad.predicate, - &quad.object, - &quad.subject, - ); - remove_from_quad_map( - &mut quad_indexes.gspo, - &quad.graph_name, - &quad.subject, - &quad.predicate, - &quad.object, - ); - remove_from_quad_map( - &mut quad_indexes.ospg, - &quad.object, - &quad.subject, - &quad.predicate, - &quad.graph_name, - ); - remove_from_quad_map( - &mut quad_indexes.posg, - &quad.predicate, - &quad.object, - &quad.subject, - &quad.graph_name, - ); - remove_from_quad_map( - &mut quad_indexes.spog, - &quad.subject, - &quad.predicate, - &quad.object, - &quad.graph_name, - ); + self.indexes_mut()?.remove_quad(quad); Ok(()) } @@ -517,6 +460,98 @@ impl MemoryStore { } } +impl MemoryStoreIndexes { + fn insert_quad(&mut self, quad: &EncodedQuad) { + insert_into_quad_map( + &mut self.gosp, + quad.graph_name, + quad.object, + quad.subject, + quad.predicate, + ); + insert_into_quad_map( + &mut self.gpos, + quad.graph_name, + quad.predicate, + quad.object, + quad.subject, + ); + insert_into_quad_map( + &mut self.gspo, + quad.graph_name, + quad.subject, + quad.predicate, + quad.object, + ); + insert_into_quad_map( + &mut self.ospg, + quad.object, + quad.subject, + quad.predicate, + quad.graph_name, + ); + insert_into_quad_map( + &mut self.posg, + quad.predicate, + quad.object, + quad.subject, + quad.graph_name, + ); + insert_into_quad_map( + &mut self.spog, + quad.subject, + quad.predicate, + quad.object, + quad.graph_name, + ); + } + + fn remove_quad(&mut self, quad: &EncodedQuad) { + remove_from_quad_map( + &mut self.gosp, + &quad.graph_name, + &quad.object, + &quad.subject, + &quad.predicate, + ); + remove_from_quad_map( + &mut self.gpos, + &quad.graph_name, + &quad.predicate, + &quad.object, + &quad.subject, + ); + remove_from_quad_map( + &mut self.gspo, + &quad.graph_name, + &quad.subject, + &quad.predicate, + &quad.object, + ); + remove_from_quad_map( + &mut self.ospg, + &quad.object, + &quad.subject, + &quad.predicate, + &quad.graph_name, + ); + remove_from_quad_map( + &mut self.posg, + &quad.predicate, + &quad.object, + &quad.subject, + &quad.graph_name, + ); + remove_from_quad_map( + &mut self.spog, + &quad.subject, + &quad.predicate, + &quad.object, + &quad.graph_name, + ); + } +} + fn wrap_error<'a, E: 'static, I: Iterator> + 'a>( iter: Result, ) -> Box> + 'a> { @@ -603,6 +638,48 @@ fn quad_map_flatten<'a, T: Copy>(gspo: &'a QuadMap) -> impl Iterator { + store: &'a MemoryStore, + ops: Vec, + strings: Vec<(u128, String)>, +} + +enum TransactionOp { + Insert(EncodedQuad), + Delete(EncodedQuad), +} + +impl StrContainer for MemoryTransaction<'_> { + fn insert_str(&mut self, key: u128, value: &str) -> Result<()> { + self.strings.push((key, value.to_owned())); + Ok(()) + } +} + +impl StoreTransaction for MemoryTransaction<'_> { + fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { + self.ops.push(TransactionOp::Insert(quad.clone())); + Ok(()) + } + + fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { + self.ops.push(TransactionOp::Delete(quad.clone())); + Ok(()) + } + + fn commit(self) -> Result<()> { + let mut indexes = self.store.indexes_mut()?; + indexes.id2str.extend(self.strings); + for op in self.ops { + match op { + TransactionOp::Insert(quad) => indexes.insert_quad(&quad), + TransactionOp::Delete(quad) => indexes.remove_quad(&quad), + } + } + Ok(()) + } +} + #[derive(Debug, Fail)] #[fail(display = "Mutex Mutex was poisoned")] pub struct MutexPoisonError { diff --git a/lib/src/store/mod.rs b/lib/src/store/mod.rs index 98761a8b..c5235696 100644 --- a/lib/src/store/mod.rs +++ b/lib/src/store/mod.rs @@ -11,6 +11,7 @@ pub use crate::store::memory::MemoryRepository; pub use crate::store::rocksdb::RocksDbRepository; use crate::model::*; +use crate::repository::RepositoryTransaction; use crate::sparql::{QueryOptions, SimplePreparedQuery}; use crate::store::numeric_encoder::*; use crate::{DatasetSyntax, GraphSyntax, RepositoryConnection, Result}; @@ -31,11 +32,11 @@ pub trait Store { /// A connection to a `Store` pub trait StoreConnection: StrLookup + Sized + Clone { type Transaction: StoreTransaction; + type AutoTransaction: StoreTransaction; - /// Creates an edition transaction - /// TODO: current transaction implementations could commit before the call to commit() - /// It's why this API is not exposed publicly yet - fn transaction(&self) -> Result; + fn transaction(&self) -> Self::Transaction; + + fn auto_transaction(&self) -> Self::AutoTransaction; fn contains(&self, quad: &EncodedQuad) -> Result; @@ -69,13 +70,32 @@ impl From for StoreRepositoryConnection { } } +impl StoreRepositoryConnection { + #[must_use] + fn auto_transaction(&self) -> StoreRepositoryTransaction { + StoreRepositoryTransaction { + inner: self.inner.auto_transaction(), + } + } +} + impl RepositoryConnection for StoreRepositoryConnection { + type Transaction = StoreRepositoryTransaction; type PreparedQuery = SimplePreparedQuery; fn prepare_query(&self, query: &str, options: QueryOptions) -> Result> { SimplePreparedQuery::new(self.inner.clone(), query, options) //TODO: avoid clone } + fn prepare_query_from_pattern( + &self, + pattern: &GraphPattern, + options: QueryOptions, + ) -> Result { + SimplePreparedQuery::new_from_pattern(self.inner.clone(), pattern, options) + //TODO: avoid clone + } + fn quads_for_pattern<'a>( &'a self, subject: Option<&NamedOrBlankNode>, @@ -97,15 +117,60 @@ impl RepositoryConnection for StoreRepositoryConnection { ) } - fn prepare_query_from_pattern( - &self, - pattern: &GraphPattern, - options: QueryOptions, - ) -> Result { - SimplePreparedQuery::new_from_pattern(self.inner.clone(), pattern, options) - //TODO: avoid clone + fn contains(&self, quad: &Quad) -> Result { + self.inner.contains(&quad.into()) + } + + #[must_use] + fn transaction(&self, f: impl FnOnce(&mut Self::Transaction) -> Result<()>) -> Result<()> { + let mut transaction = StoreRepositoryTransaction { + inner: self.inner.transaction(), + }; + f(&mut transaction)?; + transaction.inner.commit() + } + + fn load_graph( + &mut self, + reader: impl BufRead, + syntax: GraphSyntax, + to_graph_name: Option<&NamedOrBlankNode>, + base_iri: Option<&str>, + ) -> Result<()> { + let mut transaction = self.auto_transaction(); + transaction.load_graph(reader, syntax, to_graph_name, base_iri)?; + transaction.inner.commit() + } + + fn load_dataset( + &mut self, + reader: impl BufRead, + syntax: DatasetSyntax, + base_iri: Option<&str>, + ) -> Result<()> { + let mut transaction = self.auto_transaction(); + transaction.load_dataset(reader, syntax, base_iri)?; + transaction.inner.commit() } + fn insert(&mut self, quad: &Quad) -> Result<()> { + let mut transaction = self.auto_transaction(); + transaction.insert(&quad)?; + transaction.inner.commit() + } + + fn remove(&mut self, quad: &Quad) -> Result<()> { + let mut transaction = self.auto_transaction(); + transaction.remove(&quad)?; + transaction.inner.commit() + } +} + +pub struct StoreRepositoryTransaction { + inner: T, +} + +impl RepositoryTransaction for StoreRepositoryTransaction { fn load_graph( &mut self, reader: impl BufRead, @@ -140,26 +205,18 @@ impl RepositoryConnection for StoreRepositoryConnection { } } - fn contains(&self, quad: &Quad) -> Result { - self.inner.contains(&quad.into()) - } - fn insert(&mut self, quad: &Quad) -> Result<()> { - let mut transaction = self.inner.transaction()?; - let quad = transaction.encode_quad(quad)?; - transaction.insert(&quad)?; - transaction.commit() + let quad = self.inner.encode_quad(quad)?; + self.inner.insert(&quad) } fn remove(&mut self, quad: &Quad) -> Result<()> { - let mut transaction = self.inner.transaction()?; - let quad = transaction.encode_quad(quad)?; - transaction.remove(&quad)?; - transaction.commit() + let quad = quad.into(); + self.inner.remove(&quad) } } -impl StoreRepositoryConnection { +impl StoreRepositoryTransaction { fn load_from_triple_parser( &mut self, mut parser: P, @@ -168,32 +225,28 @@ impl StoreRepositoryConnection { where P::Error: Send + Sync + 'static, { - let mut transaction = self.inner.transaction()?; let mut bnode_map = HashMap::default(); let graph_name = if let Some(graph_name) = to_graph_name { - transaction.encode_named_or_blank_node(graph_name)? + self.inner.encode_named_or_blank_node(graph_name)? } else { EncodedTerm::DefaultGraph }; - let tr = &mut transaction; parser.parse_all(&mut move |t| { - let quad = tr.encode_rio_triple_in_graph(t, graph_name, &mut bnode_map)?; - tr.insert(&quad) - })?; - transaction.commit() //TODO: partials commits + let quad = self + .inner + .encode_rio_triple_in_graph(t, graph_name, &mut bnode_map)?; + self.inner.insert(&quad) + }) } fn load_from_quad_parser(&mut self, mut parser: P) -> Result<()> where P::Error: Send + Sync + 'static, { - let mut transaction = self.inner.transaction()?; let mut bnode_map = HashMap::default(); - let tr = &mut transaction; parser.parse_all(&mut move |q| { - let quad = tr.encode_rio_quad(q, &mut bnode_map)?; - tr.insert(&quad) - })?; - transaction.commit() //TODO: partials commits + let quad = self.inner.encode_rio_quad(q, &mut bnode_map)?; + self.inner.insert(&quad) + }) } } diff --git a/lib/src/store/rocksdb.rs b/lib/src/store/rocksdb.rs index 64cc872f..3bbee60b 100644 --- a/lib/src/store/rocksdb.rs +++ b/lib/src/store/rocksdb.rs @@ -14,30 +14,36 @@ use std::str; /// To use it, the `"rocksdb"` feature need to be activated. /// /// Usage example: -/// ```ignored +/// ``` /// use oxigraph::model::*; -/// use oxigraph::{Repository, RepositoryConnection, RocksDbRepository, Result}; +/// use oxigraph::{Repository, RepositoryConnection, RepositoryTransaction, RocksDbRepository, Result}; /// use crate::oxigraph::sparql::{PreparedQuery, QueryOptions}; /// use oxigraph::sparql::QueryResult; +/// # use std::fs::remove_dir_all; /// -/// let repository = RocksDbRepository::open("example.db").unwrap(); -/// let mut connection = repository.connection().unwrap(); +/// # { +/// let repository = RocksDbRepository::open("example.db")?; +/// let mut connection = repository.connection()?; /// /// // insertion -/// let ex = NamedNode::parse("http://example.com").unwrap(); +/// let ex = NamedNode::parse("http://example.com")?; /// let quad = Quad::new(ex.clone(), ex.clone(), ex.clone(), None); -/// connection.insert(&quad); +/// connection.insert(&quad)?; /// /// // quad filter /// let results: Result> = connection.quads_for_pattern(None, None, None, None).collect(); -/// assert_eq!(vec![quad], results.unwrap()); +/// assert_eq!(vec![quad], results?); /// /// // SPARQL query -/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default()).unwrap(); -/// let results = prepared_query.exec().unwrap(); +/// let prepared_query = connection.prepare_query("SELECT ?s WHERE { ?s ?p ?o }", QueryOptions::default())?; +/// let results = prepared_query.exec()?; /// if let QueryResult::Bindings(results) = results { -/// assert_eq!(results.into_values_iter().next().unwrap().unwrap()[0], Some(ex.into())); +/// assert_eq!(results.into_values_iter().next().unwrap()?[0], Some(ex.into())); /// } +/// # +/// # } +/// # remove_dir_all("example.db")?; +/// # Result::Ok(()) /// ``` pub struct RocksDbRepository { inner: RocksDbStore, @@ -106,7 +112,7 @@ impl RocksDbStore { db: DB::open_cf(&options, path, &COLUMN_FAMILIES)?, }; - let mut transaction = (&new).connection()?.transaction()?; + let mut transaction = (&new).connection()?.transaction(); transaction.set_first_strings()?; transaction.commit()?; @@ -143,13 +149,26 @@ impl StrLookup for RocksDbStoreConnection<'_> { impl<'a> StoreConnection for RocksDbStoreConnection<'a> { type Transaction = RocksDbStoreTransaction<'a>; + type AutoTransaction = RocksDbStoreAutoTransaction<'a>; + + fn transaction(&self) -> RocksDbStoreTransaction<'a> { + RocksDbStoreTransaction { + inner: RocksDbStoreInnerTransaction { + connection: self.clone(), + batch: WriteBatch::default(), + buffer: Vec::default(), + }, + } + } - fn transaction(&self) -> Result> { - Ok(RocksDbStoreTransaction { - connection: self.clone(), - batch: WriteBatch::default(), - buffer: Vec::default(), - }) + fn auto_transaction(&self) -> RocksDbStoreAutoTransaction<'a> { + RocksDbStoreAutoTransaction { + inner: RocksDbStoreInnerTransaction { + connection: self.clone(), + batch: WriteBatch::default(), + buffer: Vec::default(), + }, + } } fn contains(&self, quad: &EncodedQuad) -> Result { @@ -427,20 +446,81 @@ impl<'a> RocksDbStoreConnection<'a> { } pub struct RocksDbStoreTransaction<'a> { + inner: RocksDbStoreInnerTransaction<'a>, +} + +impl StrContainer for RocksDbStoreTransaction<'_> { + fn insert_str(&mut self, key: u128, value: &str) -> Result<()> { + self.inner.insert_str(key, value) + } +} + +impl StoreTransaction for RocksDbStoreTransaction<'_> { + fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { + self.inner.insert(quad) + } + + fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { + self.inner.remove(quad) + } + + fn commit(self) -> Result<()> { + self.inner.commit() + } +} + +pub struct RocksDbStoreAutoTransaction<'a> { + inner: RocksDbStoreInnerTransaction<'a>, +} + +impl StrContainer for RocksDbStoreAutoTransaction<'_> { + fn insert_str(&mut self, key: u128, value: &str) -> Result<()> { + self.inner.insert_str(key, value) + } +} + +impl StoreTransaction for RocksDbStoreAutoTransaction<'_> { + fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { + self.inner.insert(quad)?; + self.commit_if_big() + } + + fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { + self.inner.remove(quad)?; + self.commit_if_big() + } + + fn commit(self) -> Result<()> { + self.inner.commit() + } +} + +impl RocksDbStoreAutoTransaction<'_> { + fn commit_if_big(&mut self) -> Result<()> { + if self.inner.batch.len() > MAX_TRANSACTION_SIZE { + self.inner + .connection + .store + .db + .write(replace(&mut self.inner.batch, WriteBatch::default()))?; + } + Ok(()) + } +} + +struct RocksDbStoreInnerTransaction<'a> { connection: RocksDbStoreConnection<'a>, batch: WriteBatch, buffer: Vec, } -impl StrContainer for RocksDbStoreTransaction<'_> { +impl RocksDbStoreInnerTransaction<'_> { fn insert_str(&mut self, key: u128, value: &str) -> Result<()> { self.batch .put_cf(self.connection.id2str_cf, &key.to_le_bytes(), value)?; Ok(()) } -} -impl<'a> StoreTransaction for RocksDbStoreTransaction<'a> { fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { self.buffer.write_spog_quad(quad)?; self.batch @@ -472,13 +552,6 @@ impl<'a> StoreTransaction for RocksDbStoreTransaction<'a> { .put_cf(self.connection.gosp_cf, &self.buffer, &EMPTY_BUF)?; self.buffer.clear(); - if self.batch.len() > MAX_TRANSACTION_SIZE { - self.connection - .store - .db - .write(replace(&mut self.batch, WriteBatch::default()))?; - } - Ok(()) } @@ -513,13 +586,6 @@ impl<'a> StoreTransaction for RocksDbStoreTransaction<'a> { .delete_cf(self.connection.gosp_cf, &self.buffer)?; self.buffer.clear(); - if self.batch.len() > MAX_TRANSACTION_SIZE { - self.connection - .store - .db - .write(replace(&mut self.batch, WriteBatch::default()))?; - } - Ok(()) } @@ -593,7 +659,7 @@ impl<'a, F: Fn(&[u8]) -> Result> Iterator for DecodingIndexIterator #[test] fn repository() -> Result<()> { use crate::model::*; - use crate::repository::RepositoryConnection; + use crate::*; use rand::random; use std::env::temp_dir; use std::fs::remove_dir_all; diff --git a/wikibase/src/loader.rs b/wikibase/src/loader.rs index d38ff3f7..71e6b1e3 100644 --- a/wikibase/src/loader.rs +++ b/wikibase/src/loader.rs @@ -1,7 +1,7 @@ use crate::SERVER; use chrono::{DateTime, Utc}; use oxigraph::model::NamedNode; -use oxigraph::{GraphSyntax, Repository, RepositoryConnection, Result}; +use oxigraph::*; use reqwest::header::USER_AGENT; use reqwest::{Client, Url}; use serde_json::Value; @@ -192,21 +192,22 @@ impl WikibaseLoader { } fn load_entity_data(&self, uri: &str, data: impl Read) -> Result<()> { - let mut connection = self.repository.connection()?; + let connection = self.repository.connection()?; let graph_name = NamedNode::parse(uri)?.into(); + connection.transaction(|transaction| { + let to_remove = connection + .quads_for_pattern(None, None, None, Some(Some(&graph_name))) + .collect::>>()?; + for q in to_remove { + transaction.remove(&q)?; + } - let to_remove = connection - .quads_for_pattern(None, None, None, Some(Some(&graph_name))) - .collect::>>()?; - for q in to_remove { - connection.remove(&q)?; - } - - connection.load_graph( - BufReader::new(data), - GraphSyntax::NTriples, - Some(&NamedNode::parse(uri)?.into()), - None, - ) + transaction.load_graph( + BufReader::new(data), + GraphSyntax::NTriples, + Some(&NamedNode::parse(uri)?.into()), + None, + ) + }) } }