diff --git a/lib/src/sparql/update.rs b/lib/src/sparql/update.rs index 987f2805..1498ad63 100644 --- a/lib/src/sparql/update.rs +++ b/lib/src/sparql/update.rs @@ -131,6 +131,10 @@ impl<'a> SimpleUpdateEvaluator<'a> { self.convert_ground_quad_pattern(quad, &variables, &tuple, &dataset)? { self.storage.remove(quad.as_ref())?; + // Hack to make sure the triple terms are still available for an insert + dataset.encode_term(quad.subject.as_ref()); + dataset.encode_term(quad.predicate.as_ref()); + dataset.encode_term(quad.object.as_ref()); } } for quad in insert { diff --git a/lib/src/storage/binary_encoder.rs b/lib/src/storage/binary_encoder.rs index 51165ca1..e028ea29 100644 --- a/lib/src/storage/binary_encoder.rs +++ b/lib/src/storage/binary_encoder.rs @@ -7,7 +7,7 @@ use std::io::{Cursor, Read}; use std::mem::size_of; use std::rc::Rc; -pub const LATEST_STORAGE_VERSION: u64 = 1; +pub const LATEST_STORAGE_VERSION: u64 = 2; pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::() + 2 * size_of::(); // Encoded term type blocks @@ -673,6 +673,11 @@ mod tests { .or_insert_with(|| value.to_owned()); Ok(()) } + + fn remove_str(&self, key: &StrHash) -> Result<(), Self::Error> { + self.id2str.borrow_mut().remove(key); + Ok(()) + } } #[test] @@ -736,7 +741,8 @@ mod tests { .into(), ]; for term in terms { - let encoded = store.encode_term(term.as_ref()).unwrap(); + let encoded = term.as_ref().into(); + store.insert_term(term.as_ref(), &encoded).unwrap(); assert_eq!(encoded, term.as_ref().into()); assert_eq!(term, store.decode_term(&encoded).unwrap()); diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 79a6c572..38fe2fe3 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -20,6 +20,7 @@ use crate::storage::binary_encoder::{ }; use crate::storage::io::StoreOrParseError; use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup, TermEncoder}; +use std::convert::TryInto; mod binary_encoder; pub(crate) mod io; @@ -68,6 +69,7 @@ impl Storage { dosp: db.open_tree("dosp")?, graphs: db.open_tree("graphs")?, }; + this.id2str.set_merge_operator(id2str_merge); let mut version = this.ensure_version()?; if version == 0 { @@ -82,6 +84,19 @@ impl Storage { this.set_version(version)?; this.graphs.flush()?; } + if version == 1 { + // We migrate to v2 + for entry in this.id2str.iter() { + let (key, value) = entry?; + let mut new_value = Vec::with_capacity(value.len() + 4); + new_value.extend_from_slice(&u32::MAX.to_be_bytes()); + new_value.extend_from_slice(&value); + this.id2str.insert(key, new_value)?; + } + version = 2; + this.set_version(version)?; + this.id2str.flush()?; + } match version { _ if version < LATEST_STORAGE_VERSION => Err(invalid_data_error(format!( @@ -237,6 +252,10 @@ impl Storage { ) } + fn quads_in_named_graph(&self) -> DecodingQuadIterator { + self.gspo_quads(Vec::default()) + } + fn quads_for_subject(&self, subject: &EncodedTerm) -> ChainedDecodingQuadIterator { ChainedDecodingQuadIterator::pair( self.dspo_quads(encode_term(subject)), @@ -459,54 +478,58 @@ impl Storage { } pub fn insert(&self, quad: QuadRef<'_>) -> std::io::Result { - let quad = self.encode_quad(quad)?; let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); + let encoded = quad.into(); if quad.graph_name.is_default_graph() { - write_spo_quad(&mut buffer, &quad); + write_spo_quad(&mut buffer, &encoded); let is_new = self.dspo.insert(buffer.as_slice(), &[])?.is_none(); if is_new { buffer.clear(); + self.insert_quad_triple(quad, &encoded)?; - write_pos_quad(&mut buffer, &quad); + write_pos_quad(&mut buffer, &encoded); self.dpos.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_osp_quad(&mut buffer, &quad); + write_osp_quad(&mut buffer, &encoded); self.dosp.insert(buffer.as_slice(), &[])?; buffer.clear(); } Ok(is_new) } else { - write_spog_quad(&mut buffer, &quad); + write_spog_quad(&mut buffer, &encoded); let is_new = self.spog.insert(buffer.as_slice(), &[])?.is_none(); if is_new { buffer.clear(); + self.insert_quad_triple(quad, &encoded)?; - write_posg_quad(&mut buffer, &quad); + write_posg_quad(&mut buffer, &encoded); self.posg.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_ospg_quad(&mut buffer, &quad); + write_ospg_quad(&mut buffer, &encoded); self.ospg.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_gspo_quad(&mut buffer, &quad); + write_gspo_quad(&mut buffer, &encoded); self.gspo.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_gpos_quad(&mut buffer, &quad); + write_gpos_quad(&mut buffer, &encoded); self.gpos.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_gosp_quad(&mut buffer, &quad); + write_gosp_quad(&mut buffer, &encoded); self.gosp.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_term(&mut buffer, &quad.graph_name); - self.graphs.insert(&buffer, &[])?; + write_term(&mut buffer, &encoded.graph_name); + if self.graphs.insert(&buffer, &[])?.is_none() { + self.insert_graph_name(quad.graph_name, &encoded.graph_name)?; + } buffer.clear(); } @@ -535,6 +558,8 @@ impl Storage { write_osp_quad(&mut buffer, quad); self.dosp.remove(buffer.as_slice())?; buffer.clear(); + + self.remove_quad_triple(quad)?; } Ok(is_present) @@ -564,6 +589,8 @@ impl Storage { write_gosp_quad(&mut buffer, quad); self.gosp.remove(buffer.as_slice())?; buffer.clear(); + + self.remove_quad_triple(quad)?; } Ok(is_present) @@ -571,8 +598,13 @@ impl Storage { } pub fn insert_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> std::io::Result { - let graph_name = self.encode_term(graph_name)?; - self.insert_encoded_named_graph(&graph_name) + let encoded = graph_name.into(); + Ok(if self.insert_encoded_named_graph(&encoded)? { + self.insert_term(graph_name.into(), &encoded)?; + true + } else { + false + }) } fn insert_encoded_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result { @@ -580,47 +612,39 @@ impl Storage { } pub fn clear_graph(&self, graph_name: GraphNameRef<'_>) -> std::io::Result<()> { - if graph_name.is_default_graph() { - self.dspo.clear()?; - self.dpos.clear()?; - self.dosp.clear()?; - } else { - for quad in self.quads_for_graph(&graph_name.into()) { - self.remove_encoded(&quad?)?; - } + for quad in self.quads_for_graph(&graph_name.into()) { + self.remove_encoded(&quad?)?; } Ok(()) } pub fn clear_all_named_graphs(&self) -> std::io::Result<()> { - self.gspo.clear()?; - self.gpos.clear()?; - self.gosp.clear()?; - self.spog.clear()?; - self.posg.clear()?; - self.ospg.clear()?; + for quad in self.quads_in_named_graph() { + self.remove_encoded(&quad?)?; + } Ok(()) } pub fn clear_all_graphs(&self) -> std::io::Result<()> { - self.dspo.clear()?; - self.dpos.clear()?; - self.dosp.clear()?; - self.gspo.clear()?; - self.gpos.clear()?; - self.gosp.clear()?; - self.spog.clear()?; - self.posg.clear()?; - self.ospg.clear()?; + for quad in self.quads() { + self.remove_encoded(&quad?)?; + } Ok(()) } pub fn remove_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> std::io::Result { - let graph_name = &graph_name.into(); - for quad in self.quads_for_graph(graph_name) { + let graph_name = graph_name.into(); + for quad in self.quads_for_graph(&graph_name) { self.remove_encoded(&quad?)?; } - Ok(self.graphs.remove(&encode_term(graph_name))?.is_some()) + Ok( + if self.graphs.remove(&encode_term(&graph_name))?.is_some() { + self.remove_term(&graph_name)?; + true + } else { + false + }, + ) } pub fn remove_all_named_graphs(&self) -> std::io::Result<()> { @@ -662,7 +686,7 @@ impl Storage { pub fn get_str(&self, key: &StrHash) -> std::io::Result> { self.id2str .get(key.to_be_bytes())? - .map(|v| String::from_utf8(v.to_vec())) + .map(|v| String::from_utf8(v[4..].to_vec())) .transpose() .map_err(invalid_data_error) } @@ -754,55 +778,59 @@ pub struct StorageTransaction<'a> { impl<'a> StorageTransaction<'a> { pub fn insert(&self, quad: QuadRef<'_>) -> Result { - let quad = self.encode_quad(quad)?; + let encoded = quad.into(); let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); if quad.graph_name.is_default_graph() { - write_spo_quad(&mut buffer, &quad); + write_spo_quad(&mut buffer, &encoded); let is_new = self.dspo.insert(buffer.as_slice(), &[])?.is_none(); if is_new { buffer.clear(); + self.insert_quad_triple(quad, &encoded)?; - write_pos_quad(&mut buffer, &quad); + write_pos_quad(&mut buffer, &encoded); self.dpos.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_osp_quad(&mut buffer, &quad); + write_osp_quad(&mut buffer, &encoded); self.dosp.insert(buffer.as_slice(), &[])?; buffer.clear(); } Ok(is_new) } else { - write_spog_quad(&mut buffer, &quad); + write_spog_quad(&mut buffer, &encoded); let is_new = self.spog.insert(buffer.as_slice(), &[])?.is_none(); if is_new { buffer.clear(); + self.insert_quad_triple(quad, &encoded)?; - write_posg_quad(&mut buffer, &quad); + write_posg_quad(&mut buffer, &encoded); self.posg.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_ospg_quad(&mut buffer, &quad); + write_ospg_quad(&mut buffer, &encoded); self.ospg.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_gspo_quad(&mut buffer, &quad); + write_gspo_quad(&mut buffer, &encoded); self.gspo.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_gpos_quad(&mut buffer, &quad); + write_gpos_quad(&mut buffer, &encoded); self.gpos.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_gosp_quad(&mut buffer, &quad); + write_gosp_quad(&mut buffer, &encoded); self.gosp.insert(buffer.as_slice(), &[])?; buffer.clear(); - write_term(&mut buffer, &quad.graph_name); - self.graphs.insert(buffer.as_slice(), &[])?; + write_term(&mut buffer, &encoded.graph_name); + if self.graphs.insert(buffer.as_slice(), &[])?.is_none() { + self.insert_graph_name(quad.graph_name, &encoded.graph_name)?; + } buffer.clear(); } @@ -828,6 +856,8 @@ impl<'a> StorageTransaction<'a> { write_osp_quad(&mut buffer, &quad); self.dosp.remove(buffer.as_slice())?; buffer.clear(); + + self.remove_quad_triple(&quad)?; } Ok(is_present) @@ -857,6 +887,8 @@ impl<'a> StorageTransaction<'a> { write_gosp_quad(&mut buffer, &quad); self.gosp.remove(buffer.as_slice())?; buffer.clear(); + + self.remove_quad_triple(&quad)?; } Ok(is_present) @@ -867,14 +899,21 @@ impl<'a> StorageTransaction<'a> { &self, graph_name: NamedOrBlankNodeRef<'_>, ) -> Result { - let graph_name = self.encode_term(graph_name)?; - Ok(self.graphs.insert(encode_term(&graph_name), &[])?.is_none()) + let encoded = graph_name.into(); + Ok( + if self.graphs.insert(encode_term(&encoded), &[])?.is_none() { + self.insert_term(graph_name.into(), &encoded)?; + true + } else { + false + }, + ) } pub fn get_str(&self, key: &StrHash) -> Result, UnabortableTransactionError> { self.id2str .get(key.to_be_bytes())? - .map(|v| String::from_utf8(v.to_vec())) + .map(|v| String::from_utf8(v[4..].to_vec())) .transpose() .map_err(|e| UnabortableTransactionError::Storage(invalid_data_error(e))) } @@ -1064,7 +1103,23 @@ impl TermEncoder for Storage { type Error = std::io::Error; fn insert_str(&self, key: &StrHash, value: &str) -> std::io::Result<()> { - self.id2str.insert(key.to_be_bytes(), value)?; + self.id2str.merge(key.to_be_bytes(), value)?; + Ok(()) + } + + fn remove_str(&self, key: &StrHash) -> std::io::Result<()> { + self.id2str.update_and_fetch(key.to_be_bytes(), |old| { + let old = old?; + match u32::from_be_bytes(old[..4].try_into().ok()?) { + 0 | 1 => None, + u32::MAX => Some(old.to_vec()), + number => { + let mut value = old.to_vec(); + value[..4].copy_from_slice(&(number - 1).to_be_bytes()); + Some(value) + } + } + })?; Ok(()) } } @@ -1073,7 +1128,37 @@ impl<'a> TermEncoder for StorageTransaction<'a> { type Error = UnabortableTransactionError; fn insert_str(&self, key: &StrHash, value: &str) -> Result<(), UnabortableTransactionError> { - self.id2str.insert(&key.to_be_bytes(), value)?; + let new_value = if let Some(old) = self.id2str.get(key.to_be_bytes())? { + let mut new_value = old.to_vec(); + let number = u32::from_be_bytes(new_value[..4].try_into().ok().unwrap_or_default()); + new_value[..4].copy_from_slice(&number.saturating_add(1).to_be_bytes()); //TODO: check + new_value + } else { + let mut new_value = Vec::with_capacity(value.len() + 4); + new_value.extend_from_slice(&1_u32.to_be_bytes()); + new_value.extend_from_slice(value.as_bytes()); + new_value + }; + self.id2str.insert(&key.to_be_bytes(), new_value)?; + Ok(()) + } + + fn remove_str(&self, key: &StrHash) -> Result<(), UnabortableTransactionError> { + if let Some(old) = self.id2str.get(key.to_be_bytes())? { + if let Ok(number) = old[..4].try_into() { + match u32::from_be_bytes(number) { + 0 | 1 => { + self.id2str.remove(&key.to_be_bytes())?; + } + u32::MAX => (), + number => { + let mut value = old; + value[..4].copy_from_slice(&(number - 1).to_be_bytes()); + self.id2str.insert(&key.to_be_bytes(), value)?; + } + } + } + } Ok(()) } } @@ -1103,3 +1188,129 @@ impl<'a> StorageLike for StorageTransaction<'a> { self.remove(quad) } } + +fn id2str_merge( + _key: &[u8], // the key being merged + old_value: Option<&[u8]>, // the previous value, if one existed + merged_bytes: &[u8], // the new bytes being merged in +) -> Option> { + Some(if let Some(value) = old_value { + let mut value = value.to_vec(); + let number = u32::from_be_bytes(value[..4].try_into().ok()?); + value[..4].copy_from_slice(&number.saturating_add(1).to_be_bytes()); //TODO: check + value + } else { + let mut value = Vec::with_capacity(merged_bytes.len() + 4); + value.extend_from_slice(&1_u32.to_be_bytes()); + value.extend_from_slice(merged_bytes); + value + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::model::NamedNodeRef; + + #[test] + fn test_strings_removal() -> std::io::Result<()> { + let quad = QuadRef::new( + NamedNodeRef::new_unchecked("http://example.com/s"), + NamedNodeRef::new_unchecked("http://example.com/p"), + NamedNodeRef::new_unchecked("http://example.com/o"), + NamedNodeRef::new_unchecked("http://example.com/g"), + ); + let quad2 = QuadRef::new( + NamedNodeRef::new_unchecked("http://example.com/s"), + NamedNodeRef::new_unchecked("http://example.com/p"), + NamedNodeRef::new_unchecked("http://example.com/o2"), + NamedNodeRef::new_unchecked("http://example.com/g"), + ); + + let storage = Storage::new()?; + storage.insert(quad)?; + storage.insert(quad2)?; + storage.remove(quad2)?; + assert!(storage + .get_str(&StrHash::new("http://example.com/s"))? + .is_some()); + assert!(storage + .get_str(&StrHash::new("http://example.com/p"))? + .is_some()); + assert!(storage + .get_str(&StrHash::new("http://example.com/o2"))? + .is_none()); + storage.clear_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?; + assert!(storage + .get_str(&StrHash::new("http://example.com/s"))? + .is_none()); + assert!(storage + .get_str(&StrHash::new("http://example.com/p"))? + .is_none()); + assert!(storage + .get_str(&StrHash::new("http://example.com/o"))? + .is_none()); + assert!(storage + .get_str(&StrHash::new("http://example.com/g"))? + .is_some()); + storage.remove_named_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?; + assert!(storage + .get_str(&StrHash::new("http://example.com/g"))? + .is_none()); + Ok(()) + } + + #[test] + fn test_strings_removal_in_transaction() -> std::io::Result<()> { + let quad = QuadRef::new( + NamedNodeRef::new_unchecked("http://example.com/s"), + NamedNodeRef::new_unchecked("http://example.com/p"), + NamedNodeRef::new_unchecked("http://example.com/o"), + NamedNodeRef::new_unchecked("http://example.com/g"), + ); + let quad2 = QuadRef::new( + NamedNodeRef::new_unchecked("http://example.com/s"), + NamedNodeRef::new_unchecked("http://example.com/p"), + NamedNodeRef::new_unchecked("http://example.com/o2"), + NamedNodeRef::new_unchecked("http://example.com/g"), + ); + + let storage = Storage::new()?; + transac(&storage, |t| t.insert(quad))?; + transac(&storage, |t| t.insert(quad2))?; + transac(&storage, |t| t.remove(quad2))?; + assert!(storage + .get_str(&StrHash::new("http://example.com/s"))? + .is_some()); + assert!(storage + .get_str(&StrHash::new("http://example.com/p"))? + .is_some()); + assert!(storage + .get_str(&StrHash::new("http://example.com/o2"))? + .is_none()); + transac(&storage, |t| t.remove(quad))?; + assert!(storage + .get_str(&StrHash::new("http://example.com/s"))? + .is_none()); + assert!(storage + .get_str(&StrHash::new("http://example.com/p"))? + .is_none()); + assert!(storage + .get_str(&StrHash::new("http://example.com/o"))? + .is_none()); + assert!(storage + .get_str(&StrHash::new("http://example.com/g"))? + .is_some()); + Ok(()) + } + + fn transac( + storage: &Storage, + f: impl Fn(StorageTransaction<'_>) -> Result, + ) -> Result<(), TransactionError> { + storage.transaction(|t| { + f(t)?; + Ok(()) + }) + } +} diff --git a/lib/src/storage/numeric_encoder.rs b/lib/src/storage/numeric_encoder.rs index 525a7e01..34e97054 100644 --- a/lib/src/storage/numeric_encoder.rs +++ b/lib/src/storage/numeric_encoder.rs @@ -670,24 +670,44 @@ pub(super) trait TermEncoder { fn insert_str(&self, key: &StrHash, value: &str) -> Result<(), Self::Error>; - fn encode_term<'a>(&self, term: impl Into>) -> Result { - let term = term.into(); - let encoded = term.into(); - insert_term_values(term, &encoded, |key, value| self.insert_str(key, value))?; - Ok(encoded) + fn insert_term(&self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<(), Self::Error> { + insert_term_values(term, encoded, |key, value| self.insert_str(key, value)) } - fn encode_quad(&self, quad: QuadRef<'_>) -> Result { - Ok(EncodedQuad { - subject: self.encode_term(quad.subject)?, - predicate: self.encode_term(quad.predicate)?, - object: self.encode_term(quad.object)?, - graph_name: match quad.graph_name { - GraphNameRef::NamedNode(graph_name) => self.encode_term(graph_name)?, - GraphNameRef::BlankNode(graph_name) => self.encode_term(graph_name)?, - GraphNameRef::DefaultGraph => EncodedTerm::DefaultGraph, - }, - }) + fn insert_graph_name( + &self, + graph_name: GraphNameRef<'_>, + encoded: &EncodedTerm, + ) -> Result<(), Self::Error> { + match graph_name { + GraphNameRef::NamedNode(graph_name) => self.insert_term(graph_name.into(), encoded), + GraphNameRef::BlankNode(graph_name) => self.insert_term(graph_name.into(), encoded), + GraphNameRef::DefaultGraph => Ok(()), + } + } + + fn insert_quad_triple( + &self, + quad: QuadRef<'_>, + encoded: &EncodedQuad, + ) -> Result<(), Self::Error> { + self.insert_term(quad.subject.into(), &encoded.subject)?; + self.insert_term(quad.predicate.into(), &encoded.predicate)?; + self.insert_term(quad.object, &encoded.object)?; + Ok(()) + } + + fn remove_str(&self, key: &StrHash) -> Result<(), Self::Error>; + + fn remove_term(&self, encoded: &EncodedTerm) -> Result<(), Self::Error> { + remove_term_values(encoded, |key| self.remove_str(key)) + } + + fn remove_quad_triple(&self, encoded: &EncodedQuad) -> Result<(), Self::Error> { + self.remove_term(&encoded.subject)?; + self.remove_term(&encoded.predicate)?; + self.remove_term(&encoded.object)?; + Ok(()) } } @@ -753,6 +773,53 @@ pub fn insert_term_values Result<(), E> + Copy>( Ok(()) } +pub fn remove_term_values Result<(), E> + Copy>( + encoded: &EncodedTerm, + remove_str: F, +) -> Result<(), E> { + match encoded { + EncodedTerm::NamedNode { iri_id } => { + remove_str(iri_id)?; + } + EncodedTerm::BigBlankNode { id_id } => { + remove_str(id_id)?; + } + EncodedTerm::BigStringLiteral { value_id } => { + remove_str(value_id)?; + } + EncodedTerm::SmallBigLangStringLiteral { language_id, .. } => { + remove_str(language_id)?; + } + EncodedTerm::BigSmallLangStringLiteral { value_id, .. } => { + remove_str(value_id)?; + } + EncodedTerm::BigBigLangStringLiteral { + value_id, + language_id, + } => { + remove_str(value_id)?; + remove_str(language_id)?; + } + EncodedTerm::SmallTypedLiteral { datatype_id, .. } => { + remove_str(datatype_id)?; + } + EncodedTerm::BigTypedLiteral { + value_id, + datatype_id, + } => { + remove_str(value_id)?; + remove_str(datatype_id)?; + } + EncodedTerm::Triple(encoded) => { + remove_term_values(&encoded.subject, remove_str)?; + remove_term_values(&encoded.predicate, remove_str)?; + remove_term_values(&encoded.object, remove_str)?; + } + _ => (), + } + Ok(()) +} + pub fn parse_boolean_str(value: &str) -> Option { match value { "true" | "1" => Some(EncodedTerm::BooleanLiteral(true)),