Makes Storage edit method take the explicit term

Allows to abstract away the insertion process from SPARQL

Adds also some optimized methods for SPARQL UPDATE operations
pull/171/head
Tpt 4 years ago
parent ddc8eb584a
commit fa7ae0353f
  1. 79
      lib/src/sparql/update.rs
  2. 11
      lib/src/storage/io.rs
  3. 144
      lib/src/storage/mod.rs
  4. 26
      lib/src/store.rs

@ -12,7 +12,7 @@ use crate::sparql::plan::EncodedTuple;
use crate::sparql::plan_builder::PlanBuilder;
use crate::sparql::{EvaluationError, UpdateOptions};
use crate::storage::io::load_graph;
use crate::storage::numeric_encoder::{Decoder, EncodedTerm, WriteEncoder};
use crate::storage::numeric_encoder::{Decoder, EncodedTerm};
use crate::storage::Storage;
use http::header::{ACCEPT, CONTENT_TYPE, USER_AGENT};
use http::{Method, Request, StatusCode};
@ -96,8 +96,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
let mut bnodes = HashMap::new();
for quad in data {
let quad = self.convert_quad(quad, &mut bnodes);
let quad = self.storage.encode_quad(quad.as_ref())?;
self.storage.insert(&quad)?;
self.storage.insert(quad.as_ref())?;
}
Ok(())
}
@ -105,7 +104,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
fn eval_delete_data(&mut self, data: &[GroundQuad]) -> Result<(), EvaluationError> {
for quad in data {
let quad = self.convert_ground_quad(quad);
self.storage.remove(&quad.as_ref().into())?;
self.storage.remove(quad.as_ref())?;
}
Ok(())
}
@ -131,15 +130,14 @@ impl<'a> SimpleUpdateEvaluator<'a> {
if let Some(quad) =
self.convert_ground_quad_pattern(quad, &variables, &tuple, &dataset)?
{
self.storage.remove(&quad.as_ref().into())?;
self.storage.remove(quad.as_ref())?;
}
}
for quad in insert {
if let Some(quad) =
self.convert_quad_pattern(quad, &variables, &tuple, &dataset, &mut bnodes)?
{
let quad = self.storage.encode_quad(quad.as_ref())?;
self.storage.insert(&quad)?;
self.storage.insert(quad.as_ref())?;
}
}
bnodes.clear();
@ -196,32 +194,23 @@ impl<'a> SimpleUpdateEvaluator<'a> {
}
fn eval_create(&mut self, graph_name: &NamedNode, silent: bool) -> Result<(), EvaluationError> {
let encoded_graph_name = self
.storage
.encode_named_node(NamedNodeRef::new_unchecked(&graph_name.iri))?;
if self.storage.contains_named_graph(&encoded_graph_name)? {
if silent {
Ok(())
} else {
Err(EvaluationError::msg(format!(
"The graph {} already exists",
graph_name
)))
}
} else {
self.storage.insert_named_graph(&encoded_graph_name)?;
let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri);
if self.storage.insert_named_graph(graph_name.into())? || silent {
Ok(())
} else {
Err(EvaluationError::msg(format!(
"The graph {} already exists",
graph_name
)))
}
}
fn eval_clear(&mut self, graph: &GraphTarget, silent: bool) -> Result<(), EvaluationError> {
match graph {
GraphTarget::NamedNode(graph_name) => {
let encoded_graph_name = self
.storage
.encode_named_node(NamedNodeRef::new_unchecked(&graph_name.iri))?;
if self.storage.contains_named_graph(&encoded_graph_name)? {
Ok(self.storage.clear_graph(&encoded_graph_name)?)
let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri);
if self.storage.contains_named_graph(&graph_name.into())? {
Ok(self.storage.clear_graph(graph_name.into())?)
} else if silent {
Ok(())
} else {
@ -232,53 +221,31 @@ impl<'a> SimpleUpdateEvaluator<'a> {
}
}
GraphTarget::DefaultGraph => {
Ok(self.storage.clear_graph(&EncodedTerm::DefaultGraph)?)
}
GraphTarget::NamedGraphs => {
// TODO: optimize?
for graph in self.storage.named_graphs() {
self.storage.clear_graph(&graph?)?;
}
self.storage.clear_graph(GraphNameRef::DefaultGraph)?;
Ok(())
}
GraphTarget::AllGraphs => {
// TODO: optimize?
for graph in self.storage.named_graphs() {
self.storage.clear_graph(&graph?)?;
}
Ok(self.storage.clear_graph(&EncodedTerm::DefaultGraph)?)
}
GraphTarget::NamedGraphs => Ok(self.storage.clear_all_named_graphs()?),
GraphTarget::AllGraphs => Ok(self.storage.clear_all_graphs()?),
}
}
fn eval_drop(&mut self, graph: &GraphTarget, silent: bool) -> Result<(), EvaluationError> {
match graph {
GraphTarget::NamedNode(graph_name) => {
let encoded_graph_name = self
.storage
.encode_named_node(NamedNodeRef::new_unchecked(&graph_name.iri))?;
if self.storage.contains_named_graph(&encoded_graph_name)? {
self.storage.remove_named_graph(&encoded_graph_name)?;
Ok(())
} else if silent {
let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri);
if self.storage.remove_named_graph(graph_name.into())? || silent {
Ok(())
} else {
Err(EvaluationError::msg(format!(
"The graph {} does not exists",
graph
graph_name
)))
}
}
GraphTarget::DefaultGraph => {
Ok(self.storage.clear_graph(&EncodedTerm::DefaultGraph)?)
}
GraphTarget::NamedGraphs => {
// TODO: optimize?
for graph in self.storage.named_graphs() {
self.storage.remove_named_graph(&graph?)?;
}
Ok(())
Ok(self.storage.clear_graph(GraphNameRef::DefaultGraph)?)
}
GraphTarget::NamedGraphs => Ok(self.storage.remove_all_named_graphs()?),
GraphTarget::AllGraphs => Ok(self.storage.clear()?),
}
}

@ -3,7 +3,6 @@
use crate::error::invalid_input_error;
use crate::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer};
use crate::model::{BlankNode, GraphNameRef, LiteralRef, NamedNodeRef, Quad, QuadRef, Triple};
use crate::storage::numeric_encoder::WriteEncoder;
use crate::storage::StorageLike;
use oxiri::Iri;
use rio_api::model as rio;
@ -49,10 +48,9 @@ where
{
let mut bnode_map = HashMap::default();
parser.parse_all(&mut move |t| {
let quad = storage
.encode_quad(quad_from_rio_triple(&t, to_graph_name, &mut bnode_map))
storage
.insert(quad_from_rio_triple(&t, to_graph_name, &mut bnode_map))
.map_err(StoreOrParseError::Store)?;
storage.insert(&quad).map_err(StoreOrParseError::Store)?;
Ok(())
})
}
@ -131,10 +129,9 @@ where
{
let mut bnode_map = HashMap::default();
parser.parse_all(&mut move |q| {
let quad = store
.encode_quad(quad_from_rio(&q, &mut bnode_map))
store
.insert(quad_from_rio(&q, &mut bnode_map))
.map_err(StoreOrParseError::Store)?;
store.insert(&quad).map_err(StoreOrParseError::Store)?;
Ok(())
})
}

@ -10,6 +10,7 @@ use sled::transaction::{
use sled::{Config, Db, Iter, Transactional, Tree};
use crate::error::invalid_data_error;
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef};
use crate::sparql::EvaluationError;
use crate::storage::binary_encoder::{
decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple,
@ -18,7 +19,9 @@ use crate::storage::binary_encoder::{
LATEST_STORAGE_VERSION, WRITTEN_TERM_MAX_SIZE,
};
use crate::storage::io::StoreOrParseError;
use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrContainer, StrHash, StrLookup};
use crate::storage::numeric_encoder::{
EncodedQuad, EncodedTerm, StrContainer, StrHash, StrLookup, WriteEncoder,
};
mod binary_encoder;
pub(crate) mod io;
@ -74,7 +77,7 @@ impl Storage {
for quad in this.quads() {
let quad = quad?;
if !quad.graph_name.is_default_graph() {
this.insert_named_graph(&quad.graph_name)?;
this.insert_encoded_named_graph(&quad.graph_name)?;
}
}
version = 1;
@ -457,49 +460,50 @@ impl Storage {
}
}
pub fn insert(&self, quad: &EncodedQuad) -> std::io::Result<bool> {
pub fn insert(&self, quad: QuadRef<'_>) -> std::io::Result<bool> {
let quad = self.encode_quad(quad)?;
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad);
write_spo_quad(&mut buffer, &quad);
let is_new = self.dspo.insert(buffer.as_slice(), &[])?.is_none();
if is_new {
buffer.clear();
write_pos_quad(&mut buffer, quad);
write_pos_quad(&mut buffer, &quad);
self.dpos.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_osp_quad(&mut buffer, quad);
write_osp_quad(&mut buffer, &quad);
self.dosp.insert(buffer.as_slice(), &[])?;
buffer.clear();
}
Ok(is_new)
} else {
write_spog_quad(&mut buffer, quad);
write_spog_quad(&mut buffer, &quad);
let is_new = self.spog.insert(buffer.as_slice(), &[])?.is_none();
if is_new {
buffer.clear();
write_posg_quad(&mut buffer, quad);
write_posg_quad(&mut buffer, &quad);
self.posg.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_ospg_quad(&mut buffer, quad);
write_ospg_quad(&mut buffer, &quad);
self.ospg.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gspo_quad(&mut buffer, quad);
write_gspo_quad(&mut buffer, &quad);
self.gspo.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gpos_quad(&mut buffer, quad);
write_gpos_quad(&mut buffer, &quad);
self.gpos.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gosp_quad(&mut buffer, quad);
write_gosp_quad(&mut buffer, &quad);
self.gosp.insert(buffer.as_slice(), &[])?;
buffer.clear();
@ -512,7 +516,11 @@ impl Storage {
}
}
pub fn remove(&self, quad: &EncodedQuad) -> std::io::Result<bool> {
pub fn remove(&self, quad: QuadRef<'_>) -> std::io::Result<bool> {
self.remove_encoded(&quad.into())
}
fn remove_encoded(&self, quad: &EncodedQuad) -> std::io::Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
if quad.graph_name.is_default_graph() {
@ -564,30 +572,69 @@ impl Storage {
}
}
pub fn insert_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result<bool> {
pub fn insert_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> std::io::Result<bool> {
self.insert_encoded_named_graph(&graph_name.into())
}
fn insert_encoded_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result<bool> {
Ok(self.graphs.insert(&encode_term(graph_name), &[])?.is_none())
}
pub fn clear_graph(&self, graph_name: &EncodedTerm) -> std::io::Result<()> {
pub fn clear_graph(&self, graph_name: GraphNameRef<'_>) -> std::io::Result<()> {
if graph_name.is_default_graph() {
self.dspo.clear()?;
self.dpos.clear()?;
self.dosp.clear()?;
} else {
for quad in self.quads_for_graph(graph_name) {
self.remove(&quad?)?;
for quad in self.quads_for_graph(&graph_name.into()) {
self.remove_encoded(&quad?)?;
}
}
Ok(())
}
pub fn remove_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result<bool> {
pub fn clear_all_named_graphs(&self) -> std::io::Result<()> {
self.gspo.clear()?;
self.gpos.clear()?;
self.gosp.clear()?;
self.spog.clear()?;
self.posg.clear()?;
self.ospg.clear()?;
Ok(())
}
pub fn clear_all_graphs(&self) -> std::io::Result<()> {
self.dspo.clear()?;
self.dpos.clear()?;
self.dosp.clear()?;
self.gspo.clear()?;
self.gpos.clear()?;
self.gosp.clear()?;
self.spog.clear()?;
self.posg.clear()?;
self.ospg.clear()?;
Ok(())
}
pub fn remove_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> std::io::Result<bool> {
let graph_name = &graph_name.into();
for quad in self.quads_for_graph(graph_name) {
self.remove(&quad?)?;
self.remove_encoded(&quad?)?;
}
Ok(self.graphs.remove(&encode_term(graph_name))?.is_some())
}
pub fn remove_all_named_graphs(&self) -> std::io::Result<()> {
self.gspo.clear()?;
self.gpos.clear()?;
self.gosp.clear()?;
self.spog.clear()?;
self.posg.clear()?;
self.ospg.clear()?;
self.graphs.clear()?;
Ok(())
}
pub fn clear(&self) -> std::io::Result<()> {
self.dspo.clear()?;
self.dpos.clear()?;
@ -711,50 +758,51 @@ pub struct StorageTransaction<'a> {
}
impl<'a> StorageTransaction<'a> {
pub fn insert(&self, quad: &EncodedQuad) -> Result<bool, UnabortableTransactionError> {
pub fn insert(&self, quad: QuadRef<'_>) -> Result<bool, UnabortableTransactionError> {
let quad = self.encode_quad(quad)?;
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad);
write_spo_quad(&mut buffer, &quad);
let is_new = self.dspo.insert(buffer.as_slice(), &[])?.is_none();
if is_new {
buffer.clear();
write_pos_quad(&mut buffer, quad);
write_pos_quad(&mut buffer, &quad);
self.dpos.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_osp_quad(&mut buffer, quad);
write_osp_quad(&mut buffer, &quad);
self.dosp.insert(buffer.as_slice(), &[])?;
buffer.clear();
}
Ok(is_new)
} else {
write_spog_quad(&mut buffer, quad);
write_spog_quad(&mut buffer, &quad);
let is_new = self.spog.insert(buffer.as_slice(), &[])?.is_none();
if is_new {
buffer.clear();
write_posg_quad(&mut buffer, quad);
write_posg_quad(&mut buffer, &quad);
self.posg.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_ospg_quad(&mut buffer, quad);
write_ospg_quad(&mut buffer, &quad);
self.ospg.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gspo_quad(&mut buffer, quad);
write_gspo_quad(&mut buffer, &quad);
self.gspo.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gpos_quad(&mut buffer, quad);
write_gpos_quad(&mut buffer, &quad);
self.gpos.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gosp_quad(&mut buffer, quad);
write_gosp_quad(&mut buffer, &quad);
self.gosp.insert(buffer.as_slice(), &[])?;
buffer.clear();
@ -767,50 +815,51 @@ impl<'a> StorageTransaction<'a> {
}
}
pub fn remove(&self, quad: &EncodedQuad) -> Result<bool, UnabortableTransactionError> {
pub fn remove(&self, quad: QuadRef<'_>) -> Result<bool, UnabortableTransactionError> {
let quad = EncodedQuad::from(quad);
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad);
write_spo_quad(&mut buffer, &quad);
let is_present = self.dspo.remove(buffer.as_slice())?.is_some();
if is_present {
buffer.clear();
write_pos_quad(&mut buffer, quad);
write_pos_quad(&mut buffer, &quad);
self.dpos.remove(buffer.as_slice())?;
buffer.clear();
write_osp_quad(&mut buffer, quad);
write_osp_quad(&mut buffer, &quad);
self.dosp.remove(buffer.as_slice())?;
buffer.clear();
}
Ok(is_present)
} else {
write_spog_quad(&mut buffer, quad);
write_spog_quad(&mut buffer, &quad);
let is_present = self.spog.remove(buffer.as_slice())?.is_some();
if is_present {
buffer.clear();
write_posg_quad(&mut buffer, quad);
write_posg_quad(&mut buffer, &quad);
self.posg.remove(buffer.as_slice())?;
buffer.clear();
write_ospg_quad(&mut buffer, quad);
write_ospg_quad(&mut buffer, &quad);
self.ospg.remove(buffer.as_slice())?;
buffer.clear();
write_gspo_quad(&mut buffer, quad);
write_gspo_quad(&mut buffer, &quad);
self.gspo.remove(buffer.as_slice())?;
buffer.clear();
write_gpos_quad(&mut buffer, quad);
write_gpos_quad(&mut buffer, &quad);
self.gpos.remove(buffer.as_slice())?;
buffer.clear();
write_gosp_quad(&mut buffer, quad);
write_gosp_quad(&mut buffer, &quad);
self.gosp.remove(buffer.as_slice())?;
buffer.clear();
}
@ -821,9 +870,10 @@ impl<'a> StorageTransaction<'a> {
pub fn insert_named_graph(
&self,
graph_name: &EncodedTerm,
graph_name: NamedOrBlankNodeRef<'_>,
) -> Result<bool, UnabortableTransactionError> {
Ok(self.graphs.insert(encode_term(graph_name), &[])?.is_none())
let graph_name = self.encode_named_or_blank_node(graph_name)?;
Ok(self.graphs.insert(encode_term(&graph_name), &[])?.is_none())
}
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, UnabortableTransactionError> {
@ -1036,27 +1086,27 @@ impl<'a> StrContainer for StorageTransaction<'a> {
}
pub(crate) trait StorageLike: StrLookup + StrContainer {
fn insert(&self, quad: &EncodedQuad) -> Result<bool, Self::Error>;
fn insert(&self, quad: QuadRef<'_>) -> Result<bool, Self::Error>;
fn remove(&self, quad: &EncodedQuad) -> Result<bool, Self::Error>;
fn remove(&self, quad: QuadRef<'_>) -> Result<bool, Self::Error>;
}
impl StorageLike for Storage {
fn insert(&self, quad: &EncodedQuad) -> Result<bool, Self::Error> {
fn insert(&self, quad: QuadRef<'_>) -> Result<bool, Self::Error> {
self.insert(quad)
}
fn remove(&self, quad: &EncodedQuad) -> Result<bool, Self::Error> {
fn remove(&self, quad: QuadRef<'_>) -> Result<bool, Self::Error> {
self.remove(quad)
}
}
impl<'a> StorageLike for StorageTransaction<'a> {
fn insert(&self, quad: &EncodedQuad) -> Result<bool, Self::Error> {
fn insert(&self, quad: QuadRef<'_>) -> Result<bool, Self::Error> {
self.insert(quad)
}
fn remove(&self, quad: &EncodedQuad) -> Result<bool, Self::Error> {
fn remove(&self, quad: QuadRef<'_>) -> Result<bool, Self::Error> {
self.remove(quad)
}
}

@ -31,7 +31,7 @@ use crate::sparql::{
UpdateOptions,
};
use crate::storage::io::{dump_dataset, dump_graph, load_dataset, load_graph};
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm, WriteEncoder};
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
pub use crate::storage::ConflictableTransactionError;
pub use crate::storage::TransactionError;
pub use crate::storage::UnabortableTransactionError;
@ -360,8 +360,7 @@ impl Store {
/// It might leave the store in a bad state if a crash happens during the insertion.
/// Use a (memory greedy) [transaction](Store::transaction()) if you do not want that.
pub fn insert<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
let quad = self.storage.encode_quad(quad.into())?;
self.storage.insert(&quad)
self.storage.insert(quad.into())
}
/// Removes a quad from this store.
@ -372,8 +371,7 @@ impl Store {
/// It might leave the store in a bad state if a crash happens during the removal.
/// Use a (memory greedy) [transaction](Store::transaction()) if you do not want that.
pub fn remove<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
let quad = EncodedQuad::from(quad.into());
self.storage.remove(&quad)
self.storage.remove(quad.into())
}
/// Dumps a store graph into a file.
@ -489,8 +487,7 @@ impl Store {
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
let graph_name = self.storage.encode_named_or_blank_node(graph_name.into())?;
self.storage.insert_named_graph(&graph_name)
self.storage.insert_named_graph(graph_name.into())
}
/// Clears a graph from this store.
@ -512,8 +509,7 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn clear_graph<'a>(&self, graph_name: impl Into<GraphNameRef<'a>>) -> io::Result<()> {
let graph_name = EncodedTerm::from(graph_name.into());
self.storage.clear_graph(&graph_name)
self.storage.clear_graph(graph_name.into())
}
/// Removes a graph from this store.
@ -540,8 +536,7 @@ impl Store {
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
let graph_name = EncodedTerm::from(graph_name.into());
self.storage.remove_named_graph(&graph_name)
self.storage.remove_named_graph(graph_name.into())
}
/// Clears the store.
@ -703,8 +698,7 @@ impl Transaction<'_> {
&self,
quad: impl Into<QuadRef<'a>>,
) -> Result<bool, UnabortableTransactionError> {
let quad = self.storage.encode_quad(quad.into())?;
self.storage.insert(&quad)
self.storage.insert(quad.into())
}
/// Removes a quad from this store during the transaction.
@ -714,8 +708,7 @@ impl Transaction<'_> {
&self,
quad: impl Into<QuadRef<'a>>,
) -> Result<bool, UnabortableTransactionError> {
let quad = EncodedQuad::from(quad.into());
self.storage.remove(&quad)
self.storage.remove(quad.into())
}
/// Inserts a graph into this store during the transaction
@ -725,8 +718,7 @@ impl Transaction<'_> {
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> Result<bool, UnabortableTransactionError> {
let graph_name = self.storage.encode_named_or_blank_node(graph_name.into())?;
self.storage.insert_named_graph(&graph_name)
self.storage.insert_named_graph(graph_name.into())
}
}

Loading…
Cancel
Save