Makes RocksDBStore::quads_for_pattern static

pull/46/head
Tpt 5 years ago
parent fd1bb216be
commit 2cc5e39f94
  1. 438
      lib/src/store/rocksdb.rs
  2. 2
      wikibase/src/loader.rs

@ -1,13 +1,15 @@
//! Store based on the [RocksDB](https://rocksdb.org/) key-value database. //! Store based on the [RocksDB](https://rocksdb.org/) key-value database.
use crate::error::UnwrapInfallible;
use crate::model::*; use crate::model::*;
use crate::sparql::{GraphPattern, QueryOptions, QueryResult, SimplePreparedQuery}; use crate::sparql::{GraphPattern, QueryOptions, QueryResult, SimplePreparedQuery};
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::store::{load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore}; use crate::store::{load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore};
use crate::{DatasetSyntax, GraphSyntax, Result}; use crate::{DatasetSyntax, GraphSyntax, Result};
use rocksdb::*; use rocksdb::*;
use std::convert::Infallible;
use std::io::{BufRead, Cursor}; use std::io::{BufRead, Cursor};
use std::mem::take; use std::mem::{take, transmute};
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::{fmt, str}; use std::{fmt, str};
@ -43,7 +45,7 @@ use std::{fmt, str};
/// assert_eq!(solutions.next().unwrap()?.get("s"), Some(&ex.into())); /// assert_eq!(solutions.next().unwrap()?.get("s"), Some(&ex.into()));
/// } /// }
/// # /// #
/// # } /// # };
/// # remove_dir_all("example.db")?; /// # remove_dir_all("example.db")?;
/// # Result::Ok(()) /// # Result::Ok(())
/// ``` /// ```
@ -68,18 +70,6 @@ const COLUMN_FAMILIES: [&str; 7] = [
const MAX_TRANSACTION_SIZE: usize = 1024; const MAX_TRANSACTION_SIZE: usize = 1024;
#[derive(Clone)]
struct RocksDbStoreHandle<'a> {
db: &'a DB,
id2str_cf: &'a ColumnFamily,
spog_cf: &'a ColumnFamily,
posg_cf: &'a ColumnFamily,
ospg_cf: &'a ColumnFamily,
gspo_cf: &'a ColumnFamily,
gpos_cf: &'a ColumnFamily,
gosp_cf: &'a ColumnFamily,
}
impl RocksDbStore { impl RocksDbStore {
/// Opens a `RocksDbStore` /// Opens a `RocksDbStore`
pub fn open(path: impl AsRef<Path>) -> Result<Self> { pub fn open(path: impl AsRef<Path>) -> Result<Self> {
@ -92,9 +82,9 @@ impl RocksDbStore {
db: Arc::new(DB::open_cf(&options, path, &COLUMN_FAMILIES)?), db: Arc::new(DB::open_cf(&options, path, &COLUMN_FAMILIES)?),
}; };
let mut transaction = new.handle().auto_transaction(); let mut transaction = new.auto_batch_writer();
transaction.set_first_strings()?; transaction.set_first_strings()?;
transaction.commit()?; transaction.apply()?;
Ok(new) Ok(new)
} }
@ -130,42 +120,39 @@ impl RocksDbStore {
/// Retrieves quads with a filter on each quad component /// Retrieves quads with a filter on each quad component
/// ///
/// See `MemoryStore` for a usage example. /// See `MemoryStore` for a usage example.
pub fn quads_for_pattern<'a>( pub fn quads_for_pattern(
&'a self, &self,
subject: Option<&NamedOrBlankNode>, subject: Option<&NamedOrBlankNode>,
predicate: Option<&NamedNode>, predicate: Option<&NamedNode>,
object: Option<&Term>, object: Option<&Term>,
graph_name: Option<&GraphName>, graph_name: Option<&GraphName>,
) -> impl Iterator<Item = Result<Quad>> + 'a ) -> impl Iterator<Item = Result<Quad>> {
where
Self: 'a,
{
let subject = subject.map(|s| s.into()); let subject = subject.map(|s| s.into());
let predicate = predicate.map(|p| p.into()); let predicate = predicate.map(|p| p.into());
let object = object.map(|o| o.into()); let object = object.map(|o| o.into());
let graph_name = graph_name.map(|g| g.into()); let graph_name = graph_name.map(|g| g.into());
self.handle() let store = self.clone();
.encoded_quads_for_pattern(subject, predicate, object, graph_name) self.encoded_quads_for_pattern(subject, predicate, object, graph_name)
.map(move |quad| self.decode_quad(&quad?)) .map(move |quad| store.decode_quad(&quad?))
} }
/// Checks if this store contains a given quad /// Checks if this store contains a given quad
pub fn contains(&self, quad: &Quad) -> Result<bool> { pub fn contains(&self, quad: &Quad) -> Result<bool> {
let quad = quad.into(); let quad = quad.into();
self.handle().contains(&quad) self.contains_encoded(&quad)
} }
/// Returns the number of quads in the store /// Returns the number of quads in the store
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.db self.db
.full_iterator_cf(self.handle().spog_cf, IteratorMode::Start) .full_iterator_cf(self.spog_cf(), IteratorMode::Start)
.count() .count()
} }
/// Returns if the store is empty /// Returns if the store is empty
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.db self.db
.full_iterator_cf(self.handle().spog_cf, IteratorMode::Start) .full_iterator_cf(self.spog_cf(), IteratorMode::Start)
.next() .next()
.is_none() .is_none()
} }
@ -180,9 +167,15 @@ impl RocksDbStore {
&'a self, &'a self,
f: impl FnOnce(&mut RocksDbTransaction<'a>) -> Result<()>, f: impl FnOnce(&mut RocksDbTransaction<'a>) -> Result<()>,
) -> Result<()> { ) -> Result<()> {
let mut transaction = self.handle().transaction(); let mut transaction = RocksDbTransaction {
inner: BatchWriter {
store: self,
batch: WriteBatch::default(),
buffer: Vec::default(),
},
};
f(&mut transaction)?; f(&mut transaction)?;
transaction.commit() transaction.inner.apply()
} }
/// Loads a graph file (i.e. triples) into the store /// Loads a graph file (i.e. triples) into the store
@ -198,9 +191,9 @@ impl RocksDbStore {
to_graph_name: &GraphName, to_graph_name: &GraphName,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<()> { ) -> Result<()> {
let mut transaction = self.handle().auto_transaction(); let mut transaction = self.auto_batch_writer();
load_graph(&mut transaction, reader, syntax, to_graph_name, base_iri)?; load_graph(&mut transaction, reader, syntax, to_graph_name, base_iri)?;
transaction.commit() transaction.apply()
} }
/// Loads a dataset file (i.e. quads) into the store. /// Loads a dataset file (i.e. quads) into the store.
@ -215,102 +208,69 @@ impl RocksDbStore {
syntax: DatasetSyntax, syntax: DatasetSyntax,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<()> { ) -> Result<()> {
let mut transaction = self.handle().auto_transaction(); let mut transaction = self.auto_batch_writer();
load_dataset(&mut transaction, reader, syntax, base_iri)?; load_dataset(&mut transaction, reader, syntax, base_iri)?;
transaction.commit() transaction.apply()
} }
/// Adds a quad to this store. /// Adds a quad to this store.
pub fn insert(&self, quad: &Quad) -> Result<()> { pub fn insert(&self, quad: &Quad) -> Result<()> {
let mut transaction = self.handle().auto_transaction(); let mut transaction = self.auto_batch_writer();
let quad = transaction.encode_quad(quad)?; let quad = transaction.encode_quad(quad)?;
transaction.insert_encoded(&quad)?; transaction.insert_encoded(&quad)?;
transaction.commit() transaction.apply()
} }
/// Removes a quad from this store. /// Removes a quad from this store.
pub fn remove(&self, quad: &Quad) -> Result<()> { pub fn remove(&self, quad: &Quad) -> Result<()> {
let mut transaction = self.handle().auto_transaction(); let mut transaction = self.auto_batch_writer();
let quad = quad.into(); let quad = quad.into();
transaction.remove_encoded(&quad)?; transaction.remove_encoded(&quad)?;
transaction.commit() transaction.apply()
} }
fn handle(&self) -> RocksDbStoreHandle<'_> { fn id2str_cf(&self) -> &ColumnFamily {
RocksDbStoreHandle { get_cf(&self.db, ID2STR_CF)
db: &self.db,
id2str_cf: get_cf(&self.db, ID2STR_CF),
spog_cf: get_cf(&self.db, SPOG_CF),
posg_cf: get_cf(&self.db, POSG_CF),
ospg_cf: get_cf(&self.db, OSPG_CF),
gspo_cf: get_cf(&self.db, GSPO_CF),
gpos_cf: get_cf(&self.db, GPOS_CF),
gosp_cf: get_cf(&self.db, GOSP_CF),
}
}
} }
impl fmt::Display for RocksDbStore { fn spog_cf(&self) -> &ColumnFamily {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { get_cf(&self.db, SPOG_CF)
for t in self.quads_for_pattern(None, None, None, None) {
writeln!(f, "{}", t.map_err(|_| fmt::Error)?)?;
}
Ok(())
}
} }
impl StrLookup for RocksDbStore { fn posg_cf(&self) -> &ColumnFamily {
type Error = crate::Error; get_cf(&self.db, POSG_CF)
fn get_str(&self, id: StrHash) -> Result<Option<String>> {
Ok(self
.db
.get_cf(get_cf(&self.db, ID2STR_CF), &id.to_be_bytes())?
.map(String::from_utf8)
.transpose()?)
}
} }
impl ReadableEncodedStore for RocksDbStore { fn ospg_cf(&self) -> &ColumnFamily {
fn encoded_quads_for_pattern<'a>( get_cf(&self.db, OSPG_CF)
&'a self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a> {
Box::new(
self.handle()
.encoded_quads_for_pattern(subject, predicate, object, graph_name),
)
} }
fn gspo_cf(&self) -> &ColumnFamily {
get_cf(&self.db, GSPO_CF)
} }
impl<'a> RocksDbStoreHandle<'a> { fn gpos_cf(&self) -> &ColumnFamily {
fn transaction(&self) -> RocksDbTransaction<'a> { get_cf(&self.db, GPOS_CF)
RocksDbTransaction {
inner: RocksDbInnerTransaction {
handle: self.clone(),
batch: WriteBatch::default(),
buffer: Vec::default(),
},
} }
fn gosp_cf(&self) -> &ColumnFamily {
get_cf(&self.db, GOSP_CF)
} }
fn auto_transaction(&self) -> RocksDbAutoTransaction<'a> { fn auto_batch_writer(&self) -> AutoBatchWriter<'_> {
RocksDbAutoTransaction { AutoBatchWriter {
inner: RocksDbInnerTransaction { inner: BatchWriter {
handle: self.clone(), store: self,
batch: WriteBatch::default(), batch: WriteBatch::default(),
buffer: Vec::default(), buffer: Vec::default(),
}, },
} }
} }
fn contains(&self, quad: &EncodedQuad) -> Result<bool> { fn contains_encoded(&self, quad: &EncodedQuad) -> Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
write_spog_quad(&mut buffer, quad); write_spog_quad(&mut buffer, quad);
Ok(self.db.get_pinned_cf(self.spog_cf, &buffer)?.is_some()) Ok(self.db.get_pinned_cf(self.spog_cf(), &buffer)?.is_some())
} }
fn encoded_quads_for_pattern( fn encoded_quads_for_pattern(
@ -319,7 +279,7 @@ impl<'a> RocksDbStoreHandle<'a> {
predicate: Option<EncodedTerm>, predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>, object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>, graph_name: Option<EncodedTerm>,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
match subject { match subject {
Some(subject) => match predicate { Some(subject) => match predicate {
Some(predicate) => match object { Some(predicate) => match object {
@ -376,11 +336,11 @@ impl<'a> RocksDbStoreHandle<'a> {
} }
} }
fn quads(&self) -> DecodingIndexIterator<'a> { fn quads(&self) -> DecodingIndexIterator {
self.spog_quads(Vec::default()) self.spog_quads(Vec::default())
} }
fn quads_for_subject(&self, subject: EncodedTerm) -> DecodingIndexIterator<'a> { fn quads_for_subject(&self, subject: EncodedTerm) -> DecodingIndexIterator {
self.spog_quads(encode_term(subject)) self.spog_quads(encode_term(subject))
} }
@ -388,7 +348,7 @@ impl<'a> RocksDbStoreHandle<'a> {
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
self.spog_quads(encode_term_pair(subject, predicate)) self.spog_quads(encode_term_pair(subject, predicate))
} }
@ -397,7 +357,7 @@ impl<'a> RocksDbStoreHandle<'a> {
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
self.spog_quads(encode_term_triple(subject, predicate, object)) self.spog_quads(encode_term_triple(subject, predicate, object))
} }
@ -405,11 +365,11 @@ impl<'a> RocksDbStoreHandle<'a> {
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
self.ospg_quads(encode_term_pair(object, subject)) self.ospg_quads(encode_term_pair(object, subject))
} }
fn quads_for_predicate(&self, predicate: EncodedTerm) -> DecodingIndexIterator<'a> { fn quads_for_predicate(&self, predicate: EncodedTerm) -> DecodingIndexIterator {
self.posg_quads(encode_term(predicate)) self.posg_quads(encode_term(predicate))
} }
@ -417,15 +377,15 @@ impl<'a> RocksDbStoreHandle<'a> {
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
self.posg_quads(encode_term_pair(predicate, object)) self.posg_quads(encode_term_pair(predicate, object))
} }
fn quads_for_object(&self, object: EncodedTerm) -> DecodingIndexIterator<'a> { fn quads_for_object(&self, object: EncodedTerm) -> DecodingIndexIterator {
self.ospg_quads(encode_term(object)) self.ospg_quads(encode_term(object))
} }
fn quads_for_graph(&self, graph_name: EncodedTerm) -> DecodingIndexIterator<'a> { fn quads_for_graph(&self, graph_name: EncodedTerm) -> DecodingIndexIterator {
self.gspo_quads(encode_term(graph_name)) self.gspo_quads(encode_term(graph_name))
} }
@ -433,7 +393,7 @@ impl<'a> RocksDbStoreHandle<'a> {
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
self.gspo_quads(encode_term_pair(graph_name, subject)) self.gspo_quads(encode_term_pair(graph_name, subject))
} }
@ -442,7 +402,7 @@ impl<'a> RocksDbStoreHandle<'a> {
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
self.gspo_quads(encode_term_triple(graph_name, subject, predicate)) self.gspo_quads(encode_term_triple(graph_name, subject, predicate))
} }
@ -451,7 +411,7 @@ impl<'a> RocksDbStoreHandle<'a> {
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
self.gosp_quads(encode_term_triple(graph_name, object, subject)) self.gosp_quads(encode_term_triple(graph_name, object, subject))
} }
@ -459,7 +419,7 @@ impl<'a> RocksDbStoreHandle<'a> {
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
self.gpos_quads(encode_term_pair(graph_name, predicate)) self.gpos_quads(encode_term_pair(graph_name, predicate))
} }
@ -468,7 +428,7 @@ impl<'a> RocksDbStoreHandle<'a> {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
self.gpos_quads(encode_term_triple(graph_name, predicate, object)) self.gpos_quads(encode_term_triple(graph_name, predicate, object))
} }
@ -476,86 +436,99 @@ impl<'a> RocksDbStoreHandle<'a> {
&self, &self,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
self.gosp_quads(encode_term_pair(graph_name, object)) self.gosp_quads(encode_term_pair(graph_name, object))
} }
fn spog_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> { fn spog_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator {
self.inner_quads(self.spog_cf, prefix, QuadEncoding::SPOG) self.inner_quads(self.spog_cf(), prefix, QuadEncoding::SPOG)
} }
fn posg_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> { fn posg_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator {
self.inner_quads(self.posg_cf, prefix, QuadEncoding::POSG) self.inner_quads(self.posg_cf(), prefix, QuadEncoding::POSG)
} }
fn ospg_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> { fn ospg_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator {
self.inner_quads(self.ospg_cf, prefix, QuadEncoding::OSPG) self.inner_quads(self.ospg_cf(), prefix, QuadEncoding::OSPG)
} }
fn gspo_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> { fn gspo_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator {
self.inner_quads(self.gspo_cf, prefix, QuadEncoding::GSPO) self.inner_quads(self.gspo_cf(), prefix, QuadEncoding::GSPO)
} }
fn gpos_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> { fn gpos_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator {
self.inner_quads(self.gpos_cf, prefix, QuadEncoding::GPOS) self.inner_quads(self.gpos_cf(), prefix, QuadEncoding::GPOS)
} }
fn gosp_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator<'a> { fn gosp_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator {
self.inner_quads(self.gosp_cf, prefix, QuadEncoding::GOSP) self.inner_quads(self.gosp_cf(), prefix, QuadEncoding::GOSP)
} }
#[allow(unsafe_code)]
fn inner_quads( fn inner_quads(
&self, &self,
cf: &ColumnFamily, cf: &ColumnFamily,
prefix: Vec<u8>, prefix: Vec<u8>,
encoding: QuadEncoding, encoding: QuadEncoding,
) -> DecodingIndexIterator<'a> { ) -> DecodingIndexIterator {
let mut iter = self.db.raw_iterator_cf(cf); let mut iter = self.db.raw_iterator_cf(cf);
iter.seek(&prefix); iter.seek(&prefix);
DecodingIndexIterator { DecodingIndexIterator {
iter, iter: unsafe { StaticDBRowIterator::new(iter, self.db.clone()) }, // This is safe because the iterator belongs to DB
prefix, prefix,
encoding, encoding,
} }
} }
} }
/// A prepared [SPARQL query](https://www.w3.org/TR/sparql11-query/) for the `RocksDbStore`. impl fmt::Display for RocksDbStore {
pub struct RocksDbPreparedQuery(SimplePreparedQuery<RocksDbStore>); fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for t in self.quads_for_pattern(None, None, None, None) {
impl RocksDbPreparedQuery { writeln!(f, "{}", t.map_err(|_| fmt::Error)?)?;
/// Evaluates the query and returns its results
pub fn exec(&self) -> Result<QueryResult<'_>> {
self.0.exec()
} }
Ok(())
} }
/// Allows inserting and deleting quads during a transaction with the `RocksDbStore`.
pub struct RocksDbTransaction<'a> {
inner: RocksDbInnerTransaction<'a>,
} }
impl StrContainer for RocksDbTransaction<'_> { impl StrLookup for RocksDbStore {
type Error = crate::Error; type Error = crate::Error;
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> { fn get_str(&self, id: StrHash) -> Result<Option<String>> {
self.inner.insert_str(key, value); Ok(self
Ok(()) .db
.get_cf(self.id2str_cf(), &id.to_be_bytes())?
.map(String::from_utf8)
.transpose()?)
} }
} }
impl WritableEncodedStore for RocksDbTransaction<'_> { impl ReadableEncodedStore for RocksDbStore {
type Error = crate::Error; fn encoded_quads_for_pattern<'a>(
&'a self,
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> { subject: Option<EncodedTerm>,
self.inner.insert(quad) predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a> {
Box::new(self.encoded_quads_for_pattern(subject, predicate, object, graph_name))
}
} }
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> { /// A prepared [SPARQL query](https://www.w3.org/TR/sparql11-query/) for the `RocksDbStore`.
self.inner.remove(quad) pub struct RocksDbPreparedQuery(SimplePreparedQuery<RocksDbStore>);
impl RocksDbPreparedQuery {
/// Evaluates the query and returns its results
pub fn exec(&self) -> Result<QueryResult<'_>> {
self.0.exec()
} }
} }
/// Allows inserting and deleting quads during a transaction with the `RocksDbStore`.
pub struct RocksDbTransaction<'a> {
inner: BatchWriter<'a>,
}
impl RocksDbTransaction<'_> { impl RocksDbTransaction<'_> {
/// Loads a graph file (i.e. triples) into the store during the transaction. /// Loads a graph file (i.e. triples) into the store during the transaction.
/// ///
@ -571,7 +544,7 @@ impl RocksDbTransaction<'_> {
to_graph_name: &GraphName, to_graph_name: &GraphName,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<()> { ) -> Result<()> {
load_graph(self, reader, syntax, to_graph_name, base_iri) load_graph(&mut self.inner, reader, syntax, to_graph_name, base_iri)
} }
/// Loads a dataset file (i.e. quads) into the store. into the store during the transaction. /// Loads a dataset file (i.e. quads) into the store. into the store during the transaction.
@ -587,136 +560,139 @@ impl RocksDbTransaction<'_> {
syntax: DatasetSyntax, syntax: DatasetSyntax,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<()> { ) -> Result<()> {
load_dataset(self, reader, syntax, base_iri) load_dataset(&mut self.inner, reader, syntax, base_iri)
} }
/// Adds a quad to this store during the transaction. /// Adds a quad to this store during the transaction.
pub fn insert(&mut self, quad: &Quad) -> Result<()> { pub fn insert(&mut self, quad: &Quad) {
let quad = self.encode_quad(quad)?; let quad = self.inner.encode_quad(quad).unwrap_infallible();
self.insert_encoded(&quad) self.inner.insert_encoded(&quad).unwrap_infallible()
} }
/// Removes a quad from this store during the transaction. /// Removes a quad from this store during the transaction.
pub fn remove(&mut self, quad: &Quad) -> Result<()> { pub fn remove(&mut self, quad: &Quad) {
let quad = quad.into(); let quad = quad.into();
self.remove_encoded(&quad) self.inner.remove_encoded(&quad).unwrap_infallible()
}
fn commit(self) -> Result<()> {
self.inner.commit()
} }
} }
struct RocksDbAutoTransaction<'a> { struct BatchWriter<'a> {
inner: RocksDbInnerTransaction<'a>, store: &'a RocksDbStore,
batch: WriteBatch,
buffer: Vec<u8>,
} }
impl StrContainer for RocksDbAutoTransaction<'_> { impl StrContainer for BatchWriter<'_> {
type Error = crate::Error; type Error = Infallible;
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> { fn insert_str(&mut self, key: StrHash, value: &str) -> std::result::Result<(), Infallible> {
self.inner.insert_str(key, value); self.batch
.put_cf(self.store.id2str_cf(), &key.to_be_bytes(), value);
Ok(()) Ok(())
} }
} }
impl WritableEncodedStore for RocksDbAutoTransaction<'_> { impl WritableEncodedStore for BatchWriter<'_> {
type Error = crate::Error; type Error = Infallible;
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.inner.insert(quad)?;
self.commit_if_big()
}
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.inner.remove(quad)?;
self.commit_if_big()
}
}
impl RocksDbAutoTransaction<'_> { fn insert_encoded(&mut self, quad: &EncodedQuad) -> std::result::Result<(), Infallible> {
fn commit(self) -> Result<()> {
self.inner.commit()
}
fn commit_if_big(&mut self) -> Result<()> {
if self.inner.batch.len() > MAX_TRANSACTION_SIZE {
self.inner.handle.db.write(take(&mut self.inner.batch))?;
}
Ok(())
}
}
struct RocksDbInnerTransaction<'a> {
handle: RocksDbStoreHandle<'a>,
batch: WriteBatch,
buffer: Vec<u8>,
}
impl RocksDbInnerTransaction<'_> {
fn insert_str(&mut self, key: StrHash, value: &str) {
self.batch
.put_cf(self.handle.id2str_cf, &key.to_be_bytes(), value)
}
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
write_spog_quad(&mut self.buffer, quad); write_spog_quad(&mut self.buffer, quad);
self.batch.put_cf(self.handle.spog_cf, &self.buffer, &[]); self.batch.put_cf(self.store.spog_cf(), &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
write_posg_quad(&mut self.buffer, quad); write_posg_quad(&mut self.buffer, quad);
self.batch.put_cf(self.handle.posg_cf, &self.buffer, &[]); self.batch.put_cf(self.store.posg_cf(), &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
write_ospg_quad(&mut self.buffer, quad); write_ospg_quad(&mut self.buffer, quad);
self.batch.put_cf(self.handle.ospg_cf, &self.buffer, &[]); self.batch.put_cf(self.store.ospg_cf(), &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
write_gspo_quad(&mut self.buffer, quad); write_gspo_quad(&mut self.buffer, quad);
self.batch.put_cf(self.handle.gspo_cf, &self.buffer, &[]); self.batch.put_cf(self.store.gspo_cf(), &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
write_gpos_quad(&mut self.buffer, quad); write_gpos_quad(&mut self.buffer, quad);
self.batch.put_cf(self.handle.gpos_cf, &self.buffer, &[]); self.batch.put_cf(self.store.gpos_cf(), &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
write_gosp_quad(&mut self.buffer, quad); write_gosp_quad(&mut self.buffer, quad);
self.batch.put_cf(self.handle.gosp_cf, &self.buffer, &[]); self.batch.put_cf(self.store.gosp_cf(), &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
Ok(()) Ok(())
} }
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { fn remove_encoded(&mut self, quad: &EncodedQuad) -> std::result::Result<(), Infallible> {
write_spog_quad(&mut self.buffer, quad); write_spog_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.handle.spog_cf, &self.buffer); self.batch.delete_cf(self.store.spog_cf(), &self.buffer);
self.buffer.clear(); self.buffer.clear();
write_posg_quad(&mut self.buffer, quad); write_posg_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.handle.posg_cf, &self.buffer); self.batch.delete_cf(self.store.posg_cf(), &self.buffer);
self.buffer.clear(); self.buffer.clear();
write_ospg_quad(&mut self.buffer, quad); write_ospg_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.handle.ospg_cf, &self.buffer); self.batch.delete_cf(self.store.ospg_cf(), &self.buffer);
self.buffer.clear(); self.buffer.clear();
write_gspo_quad(&mut self.buffer, quad); write_gspo_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.handle.gspo_cf, &self.buffer); self.batch.delete_cf(self.store.gspo_cf(), &self.buffer);
self.buffer.clear(); self.buffer.clear();
write_gpos_quad(&mut self.buffer, quad); write_gpos_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.handle.gpos_cf, &self.buffer); self.batch.delete_cf(self.store.gpos_cf(), &self.buffer);
self.buffer.clear(); self.buffer.clear();
write_gosp_quad(&mut self.buffer, quad); write_gosp_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.handle.gosp_cf, &self.buffer); self.batch.delete_cf(self.store.gosp_cf(), &self.buffer);
self.buffer.clear(); self.buffer.clear();
Ok(()) Ok(())
} }
}
impl BatchWriter<'_> {
fn apply(self) -> Result<()> {
Ok(self.store.db.write(self.batch)?)
}
}
struct AutoBatchWriter<'a> {
inner: BatchWriter<'a>,
}
impl StrContainer for AutoBatchWriter<'_> {
type Error = crate::Error;
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> {
Ok(self.inner.insert_str(key, value)?)
}
}
impl WritableEncodedStore for AutoBatchWriter<'_> {
type Error = crate::Error;
fn commit(self) -> Result<()> { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.handle.db.write(self.batch)?; self.inner.insert_encoded(quad)?;
self.apply_if_big()
}
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
self.inner.remove_encoded(quad)?;
self.apply_if_big()
}
}
impl AutoBatchWriter<'_> {
fn apply(self) -> Result<()> {
self.inner.apply()
}
fn apply_if_big(&mut self) -> Result<()> {
if self.inner.batch.len() > MAX_TRANSACTION_SIZE {
self.inner.store.db.write(take(&mut self.inner.batch))?;
}
Ok(()) Ok(())
} }
} }
@ -757,20 +733,40 @@ fn encode_term_quad(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm, t4: Encod
vec vec
} }
struct DecodingIndexIterator<'a> { struct StaticDBRowIterator {
iter: DBRawIterator<'a>, iter: DBRawIterator<'static>,
_db: Arc<DB>, // needed to ensure that DB still lives while iter is used
}
impl StaticDBRowIterator {
/// Creates a static iterator from a non static one by keeping a ARC reference to the database
/// Caller must unsure that the iterator belongs to the same database
///
/// This unsafe method is required to get static iterators and ease the usage of the library
/// and make streaming Python bindings possible
#[allow(unsafe_code)]
unsafe fn new(iter: DBRawIterator<'_>, db: Arc<DB>) -> Self {
Self {
iter: transmute(iter),
_db: db,
}
}
}
struct DecodingIndexIterator {
iter: StaticDBRowIterator,
prefix: Vec<u8>, prefix: Vec<u8>,
encoding: QuadEncoding, encoding: QuadEncoding,
} }
impl<'a> Iterator for DecodingIndexIterator<'a> { impl Iterator for DecodingIndexIterator {
type Item = Result<EncodedQuad>; type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> { fn next(&mut self) -> Option<Result<EncodedQuad>> {
if let Some(key) = self.iter.key() { if let Some(key) = self.iter.iter.key() {
if key.starts_with(&self.prefix) { if key.starts_with(&self.prefix) {
let result = self.encoding.decode(key).map_err(crate::Error::from); let result = self.encoding.decode(key).map_err(crate::Error::from);
self.iter.next(); self.iter.iter.next();
Some(result) Some(result)
} else { } else {
None None

@ -266,7 +266,7 @@ impl WikibaseLoader {
.quads_for_pattern(None, None, None, Some(&graph_name)) .quads_for_pattern(None, None, None, Some(&graph_name))
.collect::<oxigraph::Result<Vec<_>>>()?; .collect::<oxigraph::Result<Vec<_>>>()?;
for q in to_remove { for q in to_remove {
transaction.remove(&q)?; transaction.remove(&q);
} }
transaction.load_graph( transaction.load_graph(

Loading…
Cancel
Save