Removes string usage counting

pull/173/head
Tpt 3 years ago
parent 57b931c15f
commit b7ee3a6767
  1. 2
      lib/src/storage/binary_encoder.rs
  2. 146
      lib/src/storage/mod.rs
  3. 34
      lib/src/storage/numeric_encoder.rs

@ -7,7 +7,7 @@ use std::io::{Cursor, Read};
use std::mem::size_of; use std::mem::size_of;
use std::rc::Rc; use std::rc::Rc;
pub const LATEST_STORAGE_VERSION: u64 = 2; pub const LATEST_STORAGE_VERSION: u64 = 1;
pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<StrHash>(); pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<StrHash>();
// Encoded term type blocks // Encoded term type blocks

@ -7,9 +7,7 @@ use crate::storage::binary_encoder::{
write_pos_quad, write_posg_quad, write_spo_quad, write_spog_quad, write_term, QuadEncoding, write_pos_quad, write_posg_quad, write_spo_quad, write_spog_quad, write_term, QuadEncoding,
LATEST_STORAGE_VERSION, WRITTEN_TERM_MAX_SIZE, LATEST_STORAGE_VERSION, WRITTEN_TERM_MAX_SIZE,
}; };
use crate::storage::numeric_encoder::{ use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
insert_term, remove_term, EncodedQuad, EncodedTerm, StrHash, StrLookup,
};
use backend::{ use backend::{
ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter, ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter,
MergeOperator, MergeOperator,
@ -77,8 +75,8 @@ impl Storage {
vec![ vec![
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: ID2STR_CF, name: ID2STR_CF,
merge_operator: Some(Self::str2id_merge()), merge_operator: None,
compaction_filter: Some(Self::str2id_filter()), compaction_filter: None,
use_iter: false, use_iter: false,
min_prefix_size: 0, min_prefix_size: 0,
}, },
@ -155,51 +153,6 @@ impl Storage {
] ]
} }
fn str2id_merge() -> MergeOperator {
fn merge_counted_values<'a>(values: impl Iterator<Item = &'a [u8]>) -> Vec<u8> {
let (counter, str) =
values.fold((0_i32, [].as_ref()), |(prev_counter, prev_str), current| {
let new_counter = i32::from_be_bytes(current[..4].try_into().unwrap());
(
if prev_counter == i32::MAX {
i32::MAX // We keep to max, no counting
} else {
prev_counter.saturating_add(new_counter)
},
if prev_str.is_empty() {
&current[4..]
} else {
prev_str
},
)
});
let mut buffer = Vec::with_capacity(str.len() + 4);
buffer.extend_from_slice(&counter.to_be_bytes());
buffer.extend_from_slice(str);
buffer
}
MergeOperator {
full: |_, previous, values| merge_counted_values(previous.into_iter().chain(values)),
partial: |_, values| merge_counted_values(values),
name: CString::new("id2str_merge").unwrap(),
}
}
fn str2id_filter() -> CompactionFilter {
CompactionFilter {
filter: |_, value| {
let counter = i32::from_be_bytes(value[..4].try_into().unwrap());
if counter > 0 {
CompactionAction::Keep
} else {
CompactionAction::Remove
}
},
name: CString::new("id2str_compaction_filter").unwrap(),
}
}
fn setup(db: Db) -> Result<Self> { fn setup(db: Db) -> Result<Self> {
let this = Self { let this = Self {
default_cf: db.column_family(DEFAULT_CF).unwrap(), default_cf: db.column_family(DEFAULT_CF).unwrap(),
@ -239,31 +192,6 @@ impl Storage {
version = 1; version = 1;
this.update_version(version)?; this.update_version(version)?;
} }
if version == 1 {
// We migrate to v2
let mut transaction = this.db.transaction();
let reader = this.db.snapshot();
let mut size = 0;
let mut iter = reader.iter(&this.id2str_cf)?;
while let (Some(key), Some(value)) = (iter.key(), iter.value()) {
let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&i32::MAX.to_be_bytes());
new_value.extend_from_slice(value);
transaction.insert(&this.id2str_cf, key, &new_value)?;
iter.next();
size += 1;
if size % AUTO_WRITE_BATCH_THRESHOLD == 0 {
let mut tr = this.db.transaction();
swap(&mut transaction, &mut tr);
tr.commit()?;
}
}
transaction.commit()?;
iter.status()?;
this.db.flush(&this.id2str_cf)?;
version = 2;
this.update_version(version)?;
}
match version { match version {
_ if version < LATEST_STORAGE_VERSION => Err(invalid_data_error(format!( _ if version < LATEST_STORAGE_VERSION => Err(invalid_data_error(format!(
@ -675,25 +603,14 @@ impl StorageReader {
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>> { pub fn get_str(&self, key: &StrHash) -> Result<Option<String>> {
self.reader self.reader
.get(&self.storage.id2str_cf, &key.to_be_bytes())? .get(&self.storage.id2str_cf, &key.to_be_bytes())?
.and_then(|v| { .map(|v| String::from_utf8(v.to_vec()))
let count = i32::from_be_bytes(v[..4].try_into().unwrap());
if count > 0 {
Some(String::from_utf8(v[4..].to_vec()))
} else {
None
}
})
.transpose() .transpose()
.map_err(invalid_data_error) .map_err(invalid_data_error)
} }
pub fn contains_str(&self, key: &StrHash) -> Result<bool> { pub fn contains_str(&self, key: &StrHash) -> Result<bool> {
Ok(self self.reader
.reader .contains_key(&self.storage.id2str_cf, &key.to_be_bytes())
.get(&self.storage.id2str_cf, &key.to_be_bytes())?
.map_or(false, |v| {
i32::from_be_bytes(v[..4].try_into().unwrap()) > 0
}))
} }
} }
@ -914,11 +831,11 @@ impl StorageWriter {
} }
fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<()> { fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<()> {
self.buffer.clear(); self.transaction.insert(
self.buffer.extend_from_slice(&1_i32.to_be_bytes()); &self.storage.id2str_cf,
self.buffer.extend_from_slice(value.as_bytes()); &key.to_be_bytes(),
self.transaction value.as_bytes(),
.merge(&self.storage.id2str_cf, &key.to_be_bytes(), &self.buffer) )
} }
pub fn remove(&mut self, quad: QuadRef<'_>) -> Result<bool> { pub fn remove(&mut self, quad: QuadRef<'_>) -> Result<bool> {
@ -946,10 +863,6 @@ impl StorageWriter {
write_osp_quad(&mut self.buffer, quad); write_osp_quad(&mut self.buffer, quad);
self.transaction self.transaction
.remove(&self.storage.dosp_cf, &self.buffer)?; .remove(&self.storage.dosp_cf, &self.buffer)?;
self.remove_term(&quad.subject)?;
self.remove_term(&quad.predicate)?;
self.remove_term(&quad.object)?;
true true
} else { } else {
false false
@ -988,10 +901,6 @@ impl StorageWriter {
write_gosp_quad(&mut self.buffer, quad); write_gosp_quad(&mut self.buffer, quad);
self.transaction self.transaction
.remove(&self.storage.gosp_cf, &self.buffer)?; .remove(&self.storage.gosp_cf, &self.buffer)?;
self.remove_term(&quad.subject)?;
self.remove_term(&quad.predicate)?;
self.remove_term(&quad.object)?;
true true
} else { } else {
false false
@ -1037,7 +946,6 @@ impl StorageWriter {
{ {
self.transaction self.transaction
.remove(&self.storage.graphs_cf, &self.buffer)?; .remove(&self.storage.graphs_cf, &self.buffer)?;
self.remove_term(graph_name)?;
true true
} else { } else {
false false
@ -1062,18 +970,6 @@ impl StorageWriter {
Ok(()) Ok(())
} }
fn remove_term(&mut self, encoded: &EncodedTerm) -> Result<()> {
remove_term(encoded, &mut |key| self.remove_str(key))
}
fn remove_str(&mut self, key: &StrHash) -> Result<()> {
self.transaction.merge(
&self.storage.id2str_cf,
&key.to_be_bytes(),
&(-1_i32).to_be_bytes(),
)
}
pub fn commit(self) -> Result<()> { pub fn commit(self) -> Result<()> {
self.transaction.commit() self.transaction.commit()
} }
@ -1088,7 +984,7 @@ impl StorageWriter {
pub struct BulkLoader<'a> { pub struct BulkLoader<'a> {
storage: &'a Storage, storage: &'a Storage,
reader: Reader, reader: Reader,
id2str: HashMap<StrHash, (i32, Box<str>)>, id2str: HashMap<StrHash, Box<str>>,
quads: HashSet<EncodedQuad>, quads: HashSet<EncodedQuad>,
triples: HashSet<EncodedQuad>, triples: HashSet<EncodedQuad>,
graphs: HashSet<EncodedTerm>, graphs: HashSet<EncodedTerm>,
@ -1175,12 +1071,8 @@ impl<'a> BulkLoader<'a> {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
id2str.sort(); id2str.sort();
let mut id2str_sst = self.storage.db.new_sst_file()?; let mut id2str_sst = self.storage.db.new_sst_file()?;
let mut buffer = Vec::new(); for (k, v) in id2str {
for (k, (count, v)) in id2str { id2str_sst.insert(&k, v.as_bytes())?;
buffer.extend_from_slice(&count.to_be_bytes());
buffer.extend_from_slice(v.as_bytes());
id2str_sst.merge(&k, &buffer)?;
buffer.clear();
} }
to_load.push((&self.storage.id2str_cf, id2str_sst.finish()?)); to_load.push((&self.storage.id2str_cf, id2str_sst.finish()?));
} }
@ -1294,15 +1186,7 @@ impl<'a> BulkLoader<'a> {
fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<()> { fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<()> {
insert_term(term, encoded, &mut |key, value| { insert_term(term, encoded, &mut |key, value| {
match self.id2str.entry(*key) { self.id2str.entry(*key).or_insert_with(|| value.into());
hash_map::Entry::Occupied(mut e) => {
let e = e.get_mut();
e.0 = e.0.wrapping_add(1);
}
hash_map::Entry::Vacant(e) => {
e.insert((1, value.into()));
}
};
Ok(()) Ok(())
}) })
} }

@ -752,40 +752,6 @@ pub fn insert_term<E, F: FnMut(&StrHash, &str) -> Result<(), E>>(
} }
} }
pub fn remove_term<E, F: FnMut(&StrHash) -> Result<(), E>>(
encoded: &EncodedTerm,
remove_str: &mut F,
) -> Result<(), E> {
match encoded {
EncodedTerm::NamedNode { iri_id } => remove_str(iri_id),
EncodedTerm::BigBlankNode { id_id } => remove_str(id_id),
EncodedTerm::BigStringLiteral { value_id }
| EncodedTerm::BigSmallLangStringLiteral { value_id, .. } => remove_str(value_id),
EncodedTerm::SmallBigLangStringLiteral { language_id, .. } => remove_str(language_id),
EncodedTerm::BigBigLangStringLiteral {
value_id,
language_id,
} => {
remove_str(value_id)?;
remove_str(language_id)
}
EncodedTerm::SmallTypedLiteral { datatype_id, .. } => remove_str(datatype_id),
EncodedTerm::BigTypedLiteral {
value_id,
datatype_id,
} => {
remove_str(value_id)?;
remove_str(datatype_id)
}
EncodedTerm::Triple(encoded) => {
remove_term(&encoded.subject, remove_str)?;
remove_term(&encoded.predicate, remove_str)?;
remove_term(&encoded.object, remove_str)
}
_ => Ok(()),
}
}
pub fn parse_boolean_str(value: &str) -> Option<EncodedTerm> { pub fn parse_boolean_str(value: &str) -> Option<EncodedTerm> {
match value { match value {
"true" | "1" => Some(EncodedTerm::BooleanLiteral(true)), "true" | "1" => Some(EncodedTerm::BooleanLiteral(true)),

Loading…
Cancel
Save