String garbage collector

pull/171/head
Tpt 4 years ago
parent 8a1683eba2
commit 1abda73dc9
  1. 4
      lib/src/sparql/update.rs
  2. 10
      lib/src/storage/binary_encoder.rs
  3. 321
      lib/src/storage/mod.rs
  4. 103
      lib/src/storage/numeric_encoder.rs

@ -131,6 +131,10 @@ impl<'a> SimpleUpdateEvaluator<'a> {
self.convert_ground_quad_pattern(quad, &variables, &tuple, &dataset)?
{
self.storage.remove(quad.as_ref())?;
// Hack to make sure the triple terms are still available for an insert
dataset.encode_term(quad.subject.as_ref());
dataset.encode_term(quad.predicate.as_ref());
dataset.encode_term(quad.object.as_ref());
}
}
for quad in insert {

@ -7,7 +7,7 @@ use std::io::{Cursor, Read};
use std::mem::size_of;
use std::rc::Rc;
pub const LATEST_STORAGE_VERSION: u64 = 1;
pub const LATEST_STORAGE_VERSION: u64 = 2;
pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<StrHash>();
// Encoded term type blocks
@ -673,6 +673,11 @@ mod tests {
.or_insert_with(|| value.to_owned());
Ok(())
}
fn remove_str(&self, key: &StrHash) -> Result<(), Self::Error> {
self.id2str.borrow_mut().remove(key);
Ok(())
}
}
#[test]
@ -736,7 +741,8 @@ mod tests {
.into(),
];
for term in terms {
let encoded = store.encode_term(term.as_ref()).unwrap();
let encoded = term.as_ref().into();
store.insert_term(term.as_ref(), &encoded).unwrap();
assert_eq!(encoded, term.as_ref().into());
assert_eq!(term, store.decode_term(&encoded).unwrap());

@ -20,6 +20,7 @@ use crate::storage::binary_encoder::{
};
use crate::storage::io::StoreOrParseError;
use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup, TermEncoder};
use std::convert::TryInto;
mod binary_encoder;
pub(crate) mod io;
@ -68,6 +69,7 @@ impl Storage {
dosp: db.open_tree("dosp")?,
graphs: db.open_tree("graphs")?,
};
this.id2str.set_merge_operator(id2str_merge);
let mut version = this.ensure_version()?;
if version == 0 {
@ -82,6 +84,19 @@ impl Storage {
this.set_version(version)?;
this.graphs.flush()?;
}
if version == 1 {
// We migrate to v2
for entry in this.id2str.iter() {
let (key, value) = entry?;
let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&u32::MAX.to_be_bytes());
new_value.extend_from_slice(&value);
this.id2str.insert(key, new_value)?;
}
version = 2;
this.set_version(version)?;
this.id2str.flush()?;
}
match version {
_ if version < LATEST_STORAGE_VERSION => Err(invalid_data_error(format!(
@ -237,6 +252,10 @@ impl Storage {
)
}
fn quads_in_named_graph(&self) -> DecodingQuadIterator {
self.gspo_quads(Vec::default())
}
fn quads_for_subject(&self, subject: &EncodedTerm) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::pair(
self.dspo_quads(encode_term(subject)),
@ -459,54 +478,58 @@ impl Storage {
}
pub fn insert(&self, quad: QuadRef<'_>) -> std::io::Result<bool> {
let quad = self.encode_quad(quad)?;
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
let encoded = quad.into();
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, &quad);
write_spo_quad(&mut buffer, &encoded);
let is_new = self.dspo.insert(buffer.as_slice(), &[])?.is_none();
if is_new {
buffer.clear();
self.insert_quad_triple(quad, &encoded)?;
write_pos_quad(&mut buffer, &quad);
write_pos_quad(&mut buffer, &encoded);
self.dpos.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_osp_quad(&mut buffer, &quad);
write_osp_quad(&mut buffer, &encoded);
self.dosp.insert(buffer.as_slice(), &[])?;
buffer.clear();
}
Ok(is_new)
} else {
write_spog_quad(&mut buffer, &quad);
write_spog_quad(&mut buffer, &encoded);
let is_new = self.spog.insert(buffer.as_slice(), &[])?.is_none();
if is_new {
buffer.clear();
self.insert_quad_triple(quad, &encoded)?;
write_posg_quad(&mut buffer, &quad);
write_posg_quad(&mut buffer, &encoded);
self.posg.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_ospg_quad(&mut buffer, &quad);
write_ospg_quad(&mut buffer, &encoded);
self.ospg.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gspo_quad(&mut buffer, &quad);
write_gspo_quad(&mut buffer, &encoded);
self.gspo.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gpos_quad(&mut buffer, &quad);
write_gpos_quad(&mut buffer, &encoded);
self.gpos.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gosp_quad(&mut buffer, &quad);
write_gosp_quad(&mut buffer, &encoded);
self.gosp.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_term(&mut buffer, &quad.graph_name);
self.graphs.insert(&buffer, &[])?;
write_term(&mut buffer, &encoded.graph_name);
if self.graphs.insert(&buffer, &[])?.is_none() {
self.insert_graph_name(quad.graph_name, &encoded.graph_name)?;
}
buffer.clear();
}
@ -535,6 +558,8 @@ impl Storage {
write_osp_quad(&mut buffer, quad);
self.dosp.remove(buffer.as_slice())?;
buffer.clear();
self.remove_quad_triple(quad)?;
}
Ok(is_present)
@ -564,6 +589,8 @@ impl Storage {
write_gosp_quad(&mut buffer, quad);
self.gosp.remove(buffer.as_slice())?;
buffer.clear();
self.remove_quad_triple(quad)?;
}
Ok(is_present)
@ -571,8 +598,13 @@ impl Storage {
}
pub fn insert_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> std::io::Result<bool> {
let graph_name = self.encode_term(graph_name)?;
self.insert_encoded_named_graph(&graph_name)
let encoded = graph_name.into();
Ok(if self.insert_encoded_named_graph(&encoded)? {
self.insert_term(graph_name.into(), &encoded)?;
true
} else {
false
})
}
fn insert_encoded_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result<bool> {
@ -580,47 +612,39 @@ impl Storage {
}
pub fn clear_graph(&self, graph_name: GraphNameRef<'_>) -> std::io::Result<()> {
if graph_name.is_default_graph() {
self.dspo.clear()?;
self.dpos.clear()?;
self.dosp.clear()?;
} else {
for quad in self.quads_for_graph(&graph_name.into()) {
self.remove_encoded(&quad?)?;
}
}
Ok(())
}
pub fn clear_all_named_graphs(&self) -> std::io::Result<()> {
self.gspo.clear()?;
self.gpos.clear()?;
self.gosp.clear()?;
self.spog.clear()?;
self.posg.clear()?;
self.ospg.clear()?;
for quad in self.quads_in_named_graph() {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn clear_all_graphs(&self) -> std::io::Result<()> {
self.dspo.clear()?;
self.dpos.clear()?;
self.dosp.clear()?;
self.gspo.clear()?;
self.gpos.clear()?;
self.gosp.clear()?;
self.spog.clear()?;
self.posg.clear()?;
self.ospg.clear()?;
for quad in self.quads() {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn remove_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> std::io::Result<bool> {
let graph_name = &graph_name.into();
for quad in self.quads_for_graph(graph_name) {
let graph_name = graph_name.into();
for quad in self.quads_for_graph(&graph_name) {
self.remove_encoded(&quad?)?;
}
Ok(self.graphs.remove(&encode_term(graph_name))?.is_some())
Ok(
if self.graphs.remove(&encode_term(&graph_name))?.is_some() {
self.remove_term(&graph_name)?;
true
} else {
false
},
)
}
pub fn remove_all_named_graphs(&self) -> std::io::Result<()> {
@ -662,7 +686,7 @@ impl Storage {
pub fn get_str(&self, key: &StrHash) -> std::io::Result<Option<String>> {
self.id2str
.get(key.to_be_bytes())?
.map(|v| String::from_utf8(v.to_vec()))
.map(|v| String::from_utf8(v[4..].to_vec()))
.transpose()
.map_err(invalid_data_error)
}
@ -754,55 +778,59 @@ pub struct StorageTransaction<'a> {
impl<'a> StorageTransaction<'a> {
pub fn insert(&self, quad: QuadRef<'_>) -> Result<bool, UnabortableTransactionError> {
let quad = self.encode_quad(quad)?;
let encoded = quad.into();
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, &quad);
write_spo_quad(&mut buffer, &encoded);
let is_new = self.dspo.insert(buffer.as_slice(), &[])?.is_none();
if is_new {
buffer.clear();
self.insert_quad_triple(quad, &encoded)?;
write_pos_quad(&mut buffer, &quad);
write_pos_quad(&mut buffer, &encoded);
self.dpos.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_osp_quad(&mut buffer, &quad);
write_osp_quad(&mut buffer, &encoded);
self.dosp.insert(buffer.as_slice(), &[])?;
buffer.clear();
}
Ok(is_new)
} else {
write_spog_quad(&mut buffer, &quad);
write_spog_quad(&mut buffer, &encoded);
let is_new = self.spog.insert(buffer.as_slice(), &[])?.is_none();
if is_new {
buffer.clear();
self.insert_quad_triple(quad, &encoded)?;
write_posg_quad(&mut buffer, &quad);
write_posg_quad(&mut buffer, &encoded);
self.posg.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_ospg_quad(&mut buffer, &quad);
write_ospg_quad(&mut buffer, &encoded);
self.ospg.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gspo_quad(&mut buffer, &quad);
write_gspo_quad(&mut buffer, &encoded);
self.gspo.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gpos_quad(&mut buffer, &quad);
write_gpos_quad(&mut buffer, &encoded);
self.gpos.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gosp_quad(&mut buffer, &quad);
write_gosp_quad(&mut buffer, &encoded);
self.gosp.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_term(&mut buffer, &quad.graph_name);
self.graphs.insert(buffer.as_slice(), &[])?;
write_term(&mut buffer, &encoded.graph_name);
if self.graphs.insert(buffer.as_slice(), &[])?.is_none() {
self.insert_graph_name(quad.graph_name, &encoded.graph_name)?;
}
buffer.clear();
}
@ -828,6 +856,8 @@ impl<'a> StorageTransaction<'a> {
write_osp_quad(&mut buffer, &quad);
self.dosp.remove(buffer.as_slice())?;
buffer.clear();
self.remove_quad_triple(&quad)?;
}
Ok(is_present)
@ -857,6 +887,8 @@ impl<'a> StorageTransaction<'a> {
write_gosp_quad(&mut buffer, &quad);
self.gosp.remove(buffer.as_slice())?;
buffer.clear();
self.remove_quad_triple(&quad)?;
}
Ok(is_present)
@ -867,14 +899,21 @@ impl<'a> StorageTransaction<'a> {
&self,
graph_name: NamedOrBlankNodeRef<'_>,
) -> Result<bool, UnabortableTransactionError> {
let graph_name = self.encode_term(graph_name)?;
Ok(self.graphs.insert(encode_term(&graph_name), &[])?.is_none())
let encoded = graph_name.into();
Ok(
if self.graphs.insert(encode_term(&encoded), &[])?.is_none() {
self.insert_term(graph_name.into(), &encoded)?;
true
} else {
false
},
)
}
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, UnabortableTransactionError> {
self.id2str
.get(key.to_be_bytes())?
.map(|v| String::from_utf8(v.to_vec()))
.map(|v| String::from_utf8(v[4..].to_vec()))
.transpose()
.map_err(|e| UnabortableTransactionError::Storage(invalid_data_error(e)))
}
@ -1064,7 +1103,23 @@ impl TermEncoder for Storage {
type Error = std::io::Error;
fn insert_str(&self, key: &StrHash, value: &str) -> std::io::Result<()> {
self.id2str.insert(key.to_be_bytes(), value)?;
self.id2str.merge(key.to_be_bytes(), value)?;
Ok(())
}
fn remove_str(&self, key: &StrHash) -> std::io::Result<()> {
self.id2str.update_and_fetch(key.to_be_bytes(), |old| {
let old = old?;
match u32::from_be_bytes(old[..4].try_into().ok()?) {
0 | 1 => None,
u32::MAX => Some(old.to_vec()),
number => {
let mut value = old.to_vec();
value[..4].copy_from_slice(&(number - 1).to_be_bytes());
Some(value)
}
}
})?;
Ok(())
}
}
@ -1073,7 +1128,37 @@ impl<'a> TermEncoder for StorageTransaction<'a> {
type Error = UnabortableTransactionError;
fn insert_str(&self, key: &StrHash, value: &str) -> Result<(), UnabortableTransactionError> {
let new_value = if let Some(old) = self.id2str.get(key.to_be_bytes())? {
let mut new_value = old.to_vec();
let number = u32::from_be_bytes(new_value[..4].try_into().ok().unwrap_or_default());
new_value[..4].copy_from_slice(&number.saturating_add(1).to_be_bytes()); //TODO: check
new_value
} else {
let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&1_u32.to_be_bytes());
new_value.extend_from_slice(value.as_bytes());
new_value
};
self.id2str.insert(&key.to_be_bytes(), new_value)?;
Ok(())
}
fn remove_str(&self, key: &StrHash) -> Result<(), UnabortableTransactionError> {
if let Some(old) = self.id2str.get(key.to_be_bytes())? {
if let Ok(number) = old[..4].try_into() {
match u32::from_be_bytes(number) {
0 | 1 => {
self.id2str.remove(&key.to_be_bytes())?;
}
u32::MAX => (),
number => {
let mut value = old;
value[..4].copy_from_slice(&(number - 1).to_be_bytes());
self.id2str.insert(&key.to_be_bytes(), value)?;
}
}
}
}
Ok(())
}
}
@ -1103,3 +1188,129 @@ impl<'a> StorageLike for StorageTransaction<'a> {
self.remove(quad)
}
}
fn id2str_merge(
_key: &[u8], // the key being merged
old_value: Option<&[u8]>, // the previous value, if one existed
merged_bytes: &[u8], // the new bytes being merged in
) -> Option<Vec<u8>> {
Some(if let Some(value) = old_value {
let mut value = value.to_vec();
let number = u32::from_be_bytes(value[..4].try_into().ok()?);
value[..4].copy_from_slice(&number.saturating_add(1).to_be_bytes()); //TODO: check
value
} else {
let mut value = Vec::with_capacity(merged_bytes.len() + 4);
value.extend_from_slice(&1_u32.to_be_bytes());
value.extend_from_slice(merged_bytes);
value
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::NamedNodeRef;
#[test]
fn test_strings_removal() -> std::io::Result<()> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"),
NamedNodeRef::new_unchecked("http://example.com/g"),
);
let quad2 = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o2"),
NamedNodeRef::new_unchecked("http://example.com/g"),
);
let storage = Storage::new()?;
storage.insert(quad)?;
storage.insert(quad2)?;
storage.remove(quad2)?;
assert!(storage
.get_str(&StrHash::new("http://example.com/s"))?
.is_some());
assert!(storage
.get_str(&StrHash::new("http://example.com/p"))?
.is_some());
assert!(storage
.get_str(&StrHash::new("http://example.com/o2"))?
.is_none());
storage.clear_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?;
assert!(storage
.get_str(&StrHash::new("http://example.com/s"))?
.is_none());
assert!(storage
.get_str(&StrHash::new("http://example.com/p"))?
.is_none());
assert!(storage
.get_str(&StrHash::new("http://example.com/o"))?
.is_none());
assert!(storage
.get_str(&StrHash::new("http://example.com/g"))?
.is_some());
storage.remove_named_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?;
assert!(storage
.get_str(&StrHash::new("http://example.com/g"))?
.is_none());
Ok(())
}
#[test]
fn test_strings_removal_in_transaction() -> std::io::Result<()> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"),
NamedNodeRef::new_unchecked("http://example.com/g"),
);
let quad2 = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o2"),
NamedNodeRef::new_unchecked("http://example.com/g"),
);
let storage = Storage::new()?;
transac(&storage, |t| t.insert(quad))?;
transac(&storage, |t| t.insert(quad2))?;
transac(&storage, |t| t.remove(quad2))?;
assert!(storage
.get_str(&StrHash::new("http://example.com/s"))?
.is_some());
assert!(storage
.get_str(&StrHash::new("http://example.com/p"))?
.is_some());
assert!(storage
.get_str(&StrHash::new("http://example.com/o2"))?
.is_none());
transac(&storage, |t| t.remove(quad))?;
assert!(storage
.get_str(&StrHash::new("http://example.com/s"))?
.is_none());
assert!(storage
.get_str(&StrHash::new("http://example.com/p"))?
.is_none());
assert!(storage
.get_str(&StrHash::new("http://example.com/o"))?
.is_none());
assert!(storage
.get_str(&StrHash::new("http://example.com/g"))?
.is_some());
Ok(())
}
fn transac<T>(
storage: &Storage,
f: impl Fn(StorageTransaction<'_>) -> Result<T, UnabortableTransactionError>,
) -> Result<(), TransactionError<std::io::Error>> {
storage.transaction(|t| {
f(t)?;
Ok(())
})
}
}

@ -670,24 +670,44 @@ pub(super) trait TermEncoder {
fn insert_str(&self, key: &StrHash, value: &str) -> Result<(), Self::Error>;
fn encode_term<'a>(&self, term: impl Into<TermRef<'a>>) -> Result<EncodedTerm, Self::Error> {
let term = term.into();
let encoded = term.into();
insert_term_values(term, &encoded, |key, value| self.insert_str(key, value))?;
Ok(encoded)
}
fn encode_quad(&self, quad: QuadRef<'_>) -> Result<EncodedQuad, Self::Error> {
Ok(EncodedQuad {
subject: self.encode_term(quad.subject)?,
predicate: self.encode_term(quad.predicate)?,
object: self.encode_term(quad.object)?,
graph_name: match quad.graph_name {
GraphNameRef::NamedNode(graph_name) => self.encode_term(graph_name)?,
GraphNameRef::BlankNode(graph_name) => self.encode_term(graph_name)?,
GraphNameRef::DefaultGraph => EncodedTerm::DefaultGraph,
},
})
fn insert_term(&self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<(), Self::Error> {
insert_term_values(term, encoded, |key, value| self.insert_str(key, value))
}
fn insert_graph_name(
&self,
graph_name: GraphNameRef<'_>,
encoded: &EncodedTerm,
) -> Result<(), Self::Error> {
match graph_name {
GraphNameRef::NamedNode(graph_name) => self.insert_term(graph_name.into(), encoded),
GraphNameRef::BlankNode(graph_name) => self.insert_term(graph_name.into(), encoded),
GraphNameRef::DefaultGraph => Ok(()),
}
}
fn insert_quad_triple(
&self,
quad: QuadRef<'_>,
encoded: &EncodedQuad,
) -> Result<(), Self::Error> {
self.insert_term(quad.subject.into(), &encoded.subject)?;
self.insert_term(quad.predicate.into(), &encoded.predicate)?;
self.insert_term(quad.object, &encoded.object)?;
Ok(())
}
fn remove_str(&self, key: &StrHash) -> Result<(), Self::Error>;
fn remove_term(&self, encoded: &EncodedTerm) -> Result<(), Self::Error> {
remove_term_values(encoded, |key| self.remove_str(key))
}
fn remove_quad_triple(&self, encoded: &EncodedQuad) -> Result<(), Self::Error> {
self.remove_term(&encoded.subject)?;
self.remove_term(&encoded.predicate)?;
self.remove_term(&encoded.object)?;
Ok(())
}
}
@ -753,6 +773,53 @@ pub fn insert_term_values<E, F: Fn(&StrHash, &str) -> Result<(), E> + Copy>(
Ok(())
}
pub fn remove_term_values<E, F: Fn(&StrHash) -> Result<(), E> + Copy>(
encoded: &EncodedTerm,
remove_str: F,
) -> Result<(), E> {
match encoded {
EncodedTerm::NamedNode { iri_id } => {
remove_str(iri_id)?;
}
EncodedTerm::BigBlankNode { id_id } => {
remove_str(id_id)?;
}
EncodedTerm::BigStringLiteral { value_id } => {
remove_str(value_id)?;
}
EncodedTerm::SmallBigLangStringLiteral { language_id, .. } => {
remove_str(language_id)?;
}
EncodedTerm::BigSmallLangStringLiteral { value_id, .. } => {
remove_str(value_id)?;
}
EncodedTerm::BigBigLangStringLiteral {
value_id,
language_id,
} => {
remove_str(value_id)?;
remove_str(language_id)?;
}
EncodedTerm::SmallTypedLiteral { datatype_id, .. } => {
remove_str(datatype_id)?;
}
EncodedTerm::BigTypedLiteral {
value_id,
datatype_id,
} => {
remove_str(value_id)?;
remove_str(datatype_id)?;
}
EncodedTerm::Triple(encoded) => {
remove_term_values(&encoded.subject, remove_str)?;
remove_term_values(&encoded.predicate, remove_str)?;
remove_term_values(&encoded.object, remove_str)?;
}
_ => (),
}
Ok(())
}
pub fn parse_boolean_str(value: &str) -> Option<EncodedTerm> {
match value {
"true" | "1" => Some(EncodedTerm::BooleanLiteral(true)),

Loading…
Cancel
Save