Use lasso for MemoryStore strings

pull/46/head
Tpt 4 years ago
parent 79fcd9bc00
commit c492c70780
  1. 2
      lib/Cargo.toml
  2. 226
      lib/src/store/memory.rs
  3. 10
      lib/src/store/mod.rs

@ -35,7 +35,7 @@ hex = "0.4"
nom = "5" nom = "5"
peg = "0.6" peg = "0.6"
siphasher = "0.3" siphasher = "0.3"
lasso = "0.3" lasso = {version="0.3", features=["multi-threaded"]}
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3" js-sys = "0.3"

@ -5,20 +5,19 @@ use crate::io::{DatasetFormat, DatasetParser, GraphFormat, GraphParser};
use crate::model::*; use crate::model::*;
use crate::sparql::{EvaluationError, Query, QueryOptions, QueryResult, SimplePreparedQuery}; use crate::sparql::{EvaluationError, Query, QueryOptions, QueryResult, SimplePreparedQuery};
use crate::store::numeric_encoder::{ use crate::store::numeric_encoder::{
write_term, Decoder, ReadEncoder, StrContainer, StrHash, StrLookup, WithStoreError, Decoder, ReadEncoder, StrContainer, StrId, StrLookup, WithStoreError, WriteEncoder,
WriteEncoder, WRITTEN_TERM_MAX_SIZE,
}; };
use crate::store::{ use crate::store::{
dump_dataset, dump_graph, get_encoded_quad_pattern, load_dataset, load_graph, dump_dataset, dump_graph, get_encoded_quad_pattern, load_dataset, load_graph,
ReadableEncodedStore, WritableEncodedStore, ReadableEncodedStore, WritableEncodedStore,
}; };
use lasso::{LargeSpur, ThreadedRodeo};
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::convert::{Infallible, TryInto}; use std::convert::{Infallible, TryInto};
use std::hash::{BuildHasherDefault, Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::io::{BufRead, Write}; use std::io::{BufRead, Write};
use std::iter::FromIterator; use std::iter::FromIterator;
use std::mem::size_of;
use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard}; use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
use std::vec::IntoIter; use std::vec::IntoIter;
use std::{fmt, io}; use std::{fmt, io};
@ -53,14 +52,13 @@ use std::{fmt, io};
#[derive(Clone)] #[derive(Clone)]
pub struct MemoryStore { pub struct MemoryStore {
indexes: Arc<RwLock<MemoryStoreIndexes>>, indexes: Arc<RwLock<MemoryStoreIndexes>>,
strings: Arc<ThreadedRodeo<LargeSpur>>,
} }
type TrivialHashMap<K, V> = HashMap<K, V, BuildHasherDefault<TrivialHasher>>; type TripleMap<T> = HashMap<T, HashMap<T, HashSet<T>>>;
type TrivialHashSet<T> = HashSet<T, BuildHasherDefault<TrivialHasher>>; type QuadMap<T> = HashMap<T, TripleMap<T>>;
type TripleMap<T> = TrivialHashMap<T, TrivialHashMap<T, TrivialHashSet<T>>>; type EncodedTerm = crate::store::numeric_encoder::EncodedTerm<LargeSpur>;
type QuadMap<T> = TrivialHashMap<T, TripleMap<T>>; type EncodedQuad = crate::store::numeric_encoder::EncodedQuad<LargeSpur>;
type EncodedTerm = crate::store::numeric_encoder::EncodedTerm<StrHash>;
type EncodedQuad = crate::store::numeric_encoder::EncodedQuad<StrHash>;
#[derive(Default)] #[derive(Default)]
struct MemoryStoreIndexes { struct MemoryStoreIndexes {
@ -70,7 +68,6 @@ struct MemoryStoreIndexes {
gspo: QuadMap<EncodedTerm>, gspo: QuadMap<EncodedTerm>,
gpos: QuadMap<EncodedTerm>, gpos: QuadMap<EncodedTerm>,
gosp: QuadMap<EncodedTerm>, gosp: QuadMap<EncodedTerm>,
id2str: HashMap<StrHash, String>,
} }
impl Default for MemoryStore { impl Default for MemoryStore {
@ -84,6 +81,7 @@ impl MemoryStore {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
indexes: Arc::new(RwLock::default()), indexes: Arc::new(RwLock::default()),
strings: Arc::new(ThreadedRodeo::new()),
} }
} }
@ -247,15 +245,16 @@ impl MemoryStore {
let mut transaction = MemoryTransaction { ops: Vec::new() }; let mut transaction = MemoryTransaction { ops: Vec::new() };
f(&mut transaction)?; f(&mut transaction)?;
let mut this = self;
let mut indexes = self.indexes_mut(); let mut indexes = self.indexes_mut();
for op in transaction.ops { for op in transaction.ops {
match op { match op {
TransactionOp::Insert(quad) => { TransactionOp::Insert(quad) => {
let quad = indexes.encode_quad(&quad).unwrap_infallible(); let quad = this.encode_quad(&quad).unwrap_infallible();
indexes.insert_encoded(&quad).unwrap_infallible() indexes.insert_encoded(&quad).unwrap_infallible()
} }
TransactionOp::Delete(quad) => { TransactionOp::Delete(quad) => {
let quad = indexes.encode_quad(&quad).unwrap_infallible(); let quad = this.encode_quad(&quad).unwrap_infallible();
indexes.remove_encoded(&quad).unwrap_infallible() indexes.remove_encoded(&quad).unwrap_infallible()
} }
} }
@ -335,16 +334,16 @@ impl MemoryStore {
/// Adds a quad to this store. /// Adds a quad to this store.
#[allow(clippy::needless_pass_by_value)] #[allow(clippy::needless_pass_by_value)]
pub fn insert(&self, quad: Quad) { pub fn insert(&self, quad: Quad) {
let mut indexes = self.indexes_mut(); let mut this = self;
let quad = indexes.encode_quad(&quad).unwrap_infallible(); let quad = this.encode_quad(&quad).unwrap_infallible();
indexes.insert_encoded(&quad).unwrap_infallible(); this.insert_encoded(&quad).unwrap_infallible();
} }
/// Removes a quad from this store. /// Removes a quad from this store.
pub fn remove(&self, quad: &Quad) { pub fn remove(&self, quad: &Quad) {
let mut indexes = self.indexes_mut(); if let Some(quad) = self.get_encoded_quad(quad).unwrap_infallible() {
if let Some(quad) = indexes.get_encoded_quad(quad).unwrap_infallible() { let mut this = self;
indexes.remove_encoded(&quad).unwrap_infallible(); this.remove_encoded(&quad).unwrap_infallible();
} }
} }
@ -708,22 +707,23 @@ impl MemoryStore {
impl WithStoreError for MemoryStore { impl WithStoreError for MemoryStore {
type Error = Infallible; type Error = Infallible;
type StrId = StrHash; type StrId = LargeSpur;
} }
impl StrLookup for MemoryStore { impl StrLookup for MemoryStore {
fn get_str(&self, id: StrHash) -> Result<Option<String>, Infallible> { fn get_str(&self, id: LargeSpur) -> Result<Option<String>, Infallible> {
self.indexes().get_str(id) //TODO: avoid copy by adding a lifetime limit to get_str
Ok(self.strings.try_resolve(&id).map(|e| e.to_owned()))
} }
fn get_str_id(&self, value: &str) -> Result<Option<StrHash>, Infallible> { fn get_str_id(&self, value: &str) -> Result<Option<LargeSpur>, Infallible> {
self.indexes().get_str_id(value) Ok(self.strings.get(value))
} }
} }
impl<'a> StrContainer for &'a MemoryStore { impl<'a> StrContainer for &'a MemoryStore {
fn insert_str(&mut self, value: &str) -> Result<StrHash, Infallible> { fn insert_str(&mut self, value: &str) -> Result<LargeSpur, Infallible> {
self.indexes_mut().insert_str(value) Ok(self.strings.get_or_intern(value))
} }
} }
@ -757,33 +757,8 @@ impl<'a> WritableEncodedStore for &'a MemoryStore {
impl WithStoreError for MemoryStoreIndexes { impl WithStoreError for MemoryStoreIndexes {
type Error = Infallible; type Error = Infallible;
type StrId = StrHash; type StrId = LargeSpur;
}
impl StrLookup for MemoryStoreIndexes {
fn get_str(&self, id: StrHash) -> Result<Option<String>, Infallible> {
//TODO: avoid copy by adding a lifetime limit to get_str
Ok(self.id2str.get(&id).cloned())
}
fn get_str_id(&self, value: &str) -> Result<Option<StrHash>, Infallible> {
let id = StrHash::new(value);
Ok(if self.id2str.contains_key(&id) {
Some(id)
} else {
None
})
}
}
impl StrContainer for MemoryStoreIndexes {
fn insert_str(&mut self, value: &str) -> Result<StrHash, Infallible> {
let key = StrHash::new(value);
self.id2str.entry(key).or_insert_with(|| value.to_owned());
Ok(key)
}
} }
impl WritableEncodedStore for MemoryStoreIndexes { impl WritableEncodedStore for MemoryStoreIndexes {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), Infallible> { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), Infallible> {
insert_into_quad_map( insert_into_quad_map(
@ -913,14 +888,12 @@ fn remove_from_quad_map<T: Eq + Hash>(map1: &mut QuadMap<T>, e1: &T, e2: &T, e3:
} }
} }
fn option_set_flatten<'a, T: Clone>( fn option_set_flatten<'a, T: Clone>(i: Option<&'a HashSet<T>>) -> impl Iterator<Item = T> + 'a {
i: Option<&'a TrivialHashSet<T>>,
) -> impl Iterator<Item = T> + 'a {
i.into_iter().flat_map(|s| s.iter().cloned()) i.into_iter().flat_map(|s| s.iter().cloned())
} }
fn option_pair_map_flatten<'a, T: Copy>( fn option_pair_map_flatten<'a, T: Copy>(
i: Option<&'a TrivialHashMap<T, TrivialHashSet<T>>>, i: Option<&'a HashMap<T, HashSet<T>>>,
) -> impl Iterator<Item = (T, T)> + 'a { ) -> impl Iterator<Item = (T, T)> + 'a {
i.into_iter().flat_map(|kv| { i.into_iter().flat_map(|kv| {
kv.iter().flat_map(|(k, vs)| { kv.iter().flat_map(|(k, vs)| {
@ -1127,9 +1100,11 @@ impl Iterator for EncodedQuadsIter {
} }
} }
impl StrId for LargeSpur {}
// Isomorphism implementation // Isomorphism implementation
fn iso_canonicalize(g: &MemoryStore) -> Vec<Vec<u8>> { fn iso_canonicalize(g: &MemoryStore) -> Vec<String> {
let bnodes = bnodes(g); let bnodes = bnodes(g);
let (hash, partition) = hash_bnodes(g, bnodes.into_iter().map(|bnode| (bnode, 0)).collect()); let (hash, partition) = hash_bnodes(g, bnodes.into_iter().map(|bnode| (bnode, 0)).collect());
distinguish(g, &hash, &partition) distinguish(g, &hash, &partition)
@ -1137,9 +1112,9 @@ fn iso_canonicalize(g: &MemoryStore) -> Vec<Vec<u8>> {
fn distinguish( fn distinguish(
g: &MemoryStore, g: &MemoryStore,
hash: &TrivialHashMap<EncodedTerm, u64>, hash: &HashMap<EncodedTerm, u64>,
partition: &[(u64, Vec<EncodedTerm>)], partition: &[(u64, Vec<EncodedTerm>)],
) -> Vec<Vec<u8>> { ) -> Vec<String> {
let b_prime = partition let b_prime = partition
.iter() .iter()
.find_map(|(_, b)| if b.len() > 1 { Some(b) } else { None }); .find_map(|(_, b)| if b.len() > 1 { Some(b) } else { None });
@ -1171,41 +1146,36 @@ fn distinguish(
fn hash_bnodes( fn hash_bnodes(
g: &MemoryStore, g: &MemoryStore,
mut hashes: TrivialHashMap<EncodedTerm, u64>, mut hashes: HashMap<EncodedTerm, u64>,
) -> ( ) -> (HashMap<EncodedTerm, u64>, Vec<(u64, Vec<EncodedTerm>)>) {
TrivialHashMap<EncodedTerm, u64>,
Vec<(u64, Vec<EncodedTerm>)>,
) {
let mut to_hash = Vec::new(); let mut to_hash = Vec::new();
let mut partition: TrivialHashMap<u64, Vec<EncodedTerm>> = let mut partition: HashMap<u64, Vec<EncodedTerm>> = HashMap::new();
TrivialHashMap::with_hasher(BuildHasherDefault::<TrivialHasher>::default());
let mut partition_len = 0; let mut partition_len = 0;
loop { loop {
//TODO: improve termination //TODO: improve termination
let mut new_hashes = let mut new_hashes = HashMap::new();
TrivialHashMap::with_hasher(BuildHasherDefault::<TrivialHasher>::default());
for (bnode, old_hash) in &hashes { for (bnode, old_hash) in &hashes {
for q in g.encoded_quads_for_subject(*bnode) { for q in g.encoded_quads_for_subject(*bnode) {
to_hash.push(( to_hash.push((
hash_term(q.predicate, &hashes), hash_term(q.predicate, &hashes, g),
hash_term(q.object, &hashes), hash_term(q.object, &hashes, g),
hash_term(q.graph_name, &hashes), hash_term(q.graph_name, &hashes, g),
0, 0,
)); ));
} }
for q in g.encoded_quads_for_object(*bnode) { for q in g.encoded_quads_for_object(*bnode) {
to_hash.push(( to_hash.push((
hash_term(q.subject, &hashes), hash_term(q.subject, &hashes, g),
hash_term(q.predicate, &hashes), hash_term(q.predicate, &hashes, g),
hash_term(q.graph_name, &hashes), hash_term(q.graph_name, &hashes, g),
1, 1,
)); ));
} }
for q in g.encoded_quads_for_graph(*bnode) { for q in g.encoded_quads_for_graph(*bnode) {
to_hash.push(( to_hash.push((
hash_term(q.subject, &hashes), hash_term(q.subject, &hashes, g),
hash_term(q.predicate, &hashes), hash_term(q.predicate, &hashes, g),
hash_term(q.object, &hashes), hash_term(q.object, &hashes, g),
2, 2,
)); ));
} }
@ -1226,8 +1196,8 @@ fn hash_bnodes(
} }
} }
fn bnodes(g: &MemoryStore) -> TrivialHashSet<EncodedTerm> { fn bnodes(g: &MemoryStore) -> HashSet<EncodedTerm> {
let mut bnodes = TrivialHashSet::with_hasher(BuildHasherDefault::<TrivialHasher>::default()); let mut bnodes = HashSet::new();
for q in g.encoded_quads() { for q in g.encoded_quads() {
if q.subject.is_blank_node() { if q.subject.is_blank_node() {
bnodes.insert(q.subject); bnodes.insert(q.subject);
@ -1242,25 +1212,27 @@ fn bnodes(g: &MemoryStore) -> TrivialHashSet<EncodedTerm> {
bnodes bnodes
} }
fn label(g: &MemoryStore, hashes: &TrivialHashMap<EncodedTerm, u64>) -> Vec<Vec<u8>> { fn label(g: &MemoryStore, hashes: &HashMap<EncodedTerm, u64>) -> Vec<String> {
//TODO: better representation? //TODO: better representation?
let mut data: Vec<_> = g let mut data: Vec<_> = g
.encoded_quads() .encoded_quads()
.into_iter() .into_iter()
.map(|q| { .map(|q| {
let mut buffer = Vec::with_capacity(WRITTEN_TERM_MAX_SIZE * 4); g.decode_quad(&EncodedQuad {
write_term(&mut buffer, map_term(q.subject, hashes)); subject: map_term(q.subject, hashes),
write_term(&mut buffer, map_term(q.predicate, hashes)); predicate: map_term(q.predicate, hashes),
write_term(&mut buffer, map_term(q.object, hashes)); object: map_term(q.object, hashes),
write_term(&mut buffer, map_term(q.graph_name, hashes)); graph_name: map_term(q.graph_name, hashes),
buffer })
.unwrap()
.to_string()
}) })
.collect(); .collect();
data.sort(); data.sort();
data data
} }
fn map_term(term: EncodedTerm, bnodes_hash: &TrivialHashMap<EncodedTerm, u64>) -> EncodedTerm { fn map_term(term: EncodedTerm, bnodes_hash: &HashMap<EncodedTerm, u64>) -> EncodedTerm {
if term.is_blank_node() { if term.is_blank_node() {
EncodedTerm::InlineBlankNode { EncodedTerm::InlineBlankNode {
id: (*bnodes_hash.get(&term).unwrap()).into(), id: (*bnodes_hash.get(&term).unwrap()).into(),
@ -1270,11 +1242,13 @@ fn map_term(term: EncodedTerm, bnodes_hash: &TrivialHashMap<EncodedTerm, u64>) -
} }
} }
fn hash_term(term: EncodedTerm, bnodes_hash: &TrivialHashMap<EncodedTerm, u64>) -> u64 { fn hash_term(term: EncodedTerm, bnodes_hash: &HashMap<EncodedTerm, u64>, g: &MemoryStore) -> u64 {
if term.is_blank_node() { if term.is_blank_node() {
*bnodes_hash.get(&term).unwrap() *bnodes_hash.get(&term).unwrap()
} else { } else if let Ok(term) = g.decode_term(term) {
hash_tuple(term) hash_tuple(term)
} else {
0
} }
} }
@ -1283,77 +1257,3 @@ fn hash_tuple(v: impl Hash) -> u64 {
v.hash(&mut hasher); v.hash(&mut hasher);
hasher.finish() hasher.finish()
} }
#[derive(Default)]
struct TrivialHasher {
value: u64,
}
#[allow(
arithmetic_overflow,
clippy::cast_sign_loss,
clippy::cast_possible_truncation
)]
impl Hasher for TrivialHasher {
fn finish(&self) -> u64 {
self.value
}
fn write(&mut self, bytes: &[u8]) {
for chunk in bytes.chunks(size_of::<u64>()) {
let mut val = [0; size_of::<u64>()];
val[0..chunk.len()].copy_from_slice(chunk);
self.write_u64(u64::from_le_bytes(val));
}
}
fn write_u8(&mut self, i: u8) {
self.write_u64(i.into());
}
fn write_u16(&mut self, i: u16) {
self.write_u64(i.into());
}
fn write_u32(&mut self, i: u32) {
self.write_u64(i.into());
}
fn write_u64(&mut self, i: u64) {
self.value ^= i;
}
fn write_u128(&mut self, i: u128) {
self.write_u64(i as u64);
self.write_u64((i >> 64) as u64);
}
fn write_usize(&mut self, i: usize) {
self.write_u64(i as u64);
self.write_u64((i >> 64) as u64);
}
fn write_i8(&mut self, i: i8) {
self.write_u8(i as u8);
}
fn write_i16(&mut self, i: i16) {
self.write_u16(i as u16);
}
fn write_i32(&mut self, i: i32) {
self.write_u32(i as u32);
}
fn write_i64(&mut self, i: i64) {
self.write_u64(i as u64);
}
fn write_i128(&mut self, i: i128) {
self.write_u128(i as u128);
}
fn write_isize(&mut self, i: isize) {
self.write_usize(i as usize);
}
}

@ -45,13 +45,13 @@ pub(crate) trait ReadableEncodedStore: StrLookup {
) -> Self::QuadsIter; ) -> Self::QuadsIter;
} }
pub(crate) trait WritableEncodedStore: StrContainer { pub(crate) trait WritableEncodedStore: WithStoreError {
fn insert_encoded(&mut self, quad: &EncodedQuad<Self::StrId>) -> Result<(), Self::Error>; fn insert_encoded(&mut self, quad: &EncodedQuad<Self::StrId>) -> Result<(), Self::Error>;
fn remove_encoded(&mut self, quad: &EncodedQuad<Self::StrId>) -> Result<(), Self::Error>; fn remove_encoded(&mut self, quad: &EncodedQuad<Self::StrId>) -> Result<(), Self::Error>;
} }
fn load_graph<S: WritableEncodedStore>( fn load_graph<S: WritableEncodedStore + StrContainer>(
store: &mut S, store: &mut S,
reader: impl BufRead, reader: impl BufRead,
format: GraphFormat, format: GraphFormat,
@ -72,7 +72,7 @@ fn load_graph<S: WritableEncodedStore>(
} }
} }
fn load_from_triple_parser<S: WritableEncodedStore, P: TriplesParser>( fn load_from_triple_parser<S: WritableEncodedStore + StrContainer, P: TriplesParser>(
store: &mut S, store: &mut S,
parser: Result<P, P::Error>, parser: Result<P, P::Error>,
to_graph_name: &GraphName, to_graph_name: &GraphName,
@ -137,7 +137,7 @@ fn map_xml_err(e: RdfXmlError) -> io::Error {
io::Error::new(io::ErrorKind::Other, e) // TODO: drop io::Error::new(io::ErrorKind::Other, e) // TODO: drop
} }
fn load_dataset<S: WritableEncodedStore>( fn load_dataset<S: WritableEncodedStore + StrContainer>(
store: &mut S, store: &mut S,
reader: impl BufRead, reader: impl BufRead,
format: DatasetFormat, format: DatasetFormat,
@ -150,7 +150,7 @@ fn load_dataset<S: WritableEncodedStore>(
} }
} }
fn load_from_quad_parser<S: WritableEncodedStore, P: QuadsParser>( fn load_from_quad_parser<S: WritableEncodedStore + StrContainer, P: QuadsParser>(
store: &mut S, store: &mut S,
parser: Result<P, P::Error>, parser: Result<P, P::Error>,
) -> Result<(), StoreOrParseError<S::Error, io::Error>> ) -> Result<(), StoreOrParseError<S::Error, io::Error>>

Loading…
Cancel
Save