Fixes invalid database generated from bulk load

Adds an invariant validation test
pull/190/head
Tpt 3 years ago
parent f6a8f57f78
commit 822dd60596
  1. 119
      lib/src/storage/mod.rs
  2. 6
      lib/src/store.rs
  3. 55
      lib/tests/store.rs

@ -9,7 +9,9 @@ use crate::storage::binary_encoder::{
WRITTEN_TERM_MAX_SIZE, WRITTEN_TERM_MAX_SIZE,
}; };
pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, StorageError}; pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, StorageError};
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup}; use crate::storage::numeric_encoder::{
insert_term, Decoder, EncodedQuad, EncodedTerm, StrHash, StrLookup,
};
use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@ -631,6 +633,117 @@ impl StorageReader {
self.reader self.reader
.contains_key(&self.storage.id2str_cf, &key.to_be_bytes()) .contains_key(&self.storage.id2str_cf, &key.to_be_bytes())
} }
/// Validates that all the storage invariants held in the data
pub fn validate(&self) -> Result<(), StorageError> {
// triples
let dspo_size = self.dspo_quads(&[]).count();
if dspo_size != self.dpos_quads(&[]).count() || dspo_size != self.dosp_quads(&[]).count() {
return Err(CorruptionError::new(
"Not the same number of triples in dspo, dpos and dosp",
)
.into());
}
for spo in self.dspo_quads(&[]) {
let spo = spo?;
self.decode_quad(&spo)?; // We ensure that the quad is readable
if !self.storage.db.contains_key(
&self.storage.dpos_cf,
&encode_term_triple(&spo.predicate, &spo.object, &spo.subject),
)? {
return Err(CorruptionError::new("Quad in dspo and not in dpos").into());
}
if !self.storage.db.contains_key(
&self.storage.dosp_cf,
&encode_term_triple(&spo.object, &spo.subject, &spo.predicate),
)? {
return Err(CorruptionError::new("Quad in dspo and not in dpos").into());
}
}
// quads
let gspo_size = self.gspo_quads(&[]).count();
if gspo_size != self.gpos_quads(&[]).count()
|| gspo_size != self.gosp_quads(&[]).count()
|| gspo_size != self.spog_quads(&[]).count()
|| gspo_size != self.posg_quads(&[]).count()
|| gspo_size != self.ospg_quads(&[]).count()
{
return Err(CorruptionError::new(
"Not the same number of triples in dspo, dpos and dosp",
)
.into());
}
for gspo in self.gspo_quads(&[]) {
let gspo = gspo?;
self.decode_quad(&gspo)?; // We ensure that the quad is readable
if !self.storage.db.contains_key(
&self.storage.gpos_cf,
&encode_term_quad(
&gspo.graph_name,
&gspo.predicate,
&gspo.object,
&gspo.subject,
),
)? {
return Err(CorruptionError::new("Quad in gspo and not in gpos").into());
}
if !self.storage.db.contains_key(
&self.storage.gosp_cf,
&encode_term_quad(
&gspo.graph_name,
&gspo.object,
&gspo.subject,
&gspo.predicate,
),
)? {
return Err(CorruptionError::new("Quad in gspo and not in gosp").into());
}
if !self.storage.db.contains_key(
&self.storage.spog_cf,
&encode_term_quad(
&gspo.subject,
&gspo.predicate,
&gspo.object,
&gspo.graph_name,
),
)? {
return Err(CorruptionError::new("Quad in gspo and not in spog").into());
}
if !self.storage.db.contains_key(
&self.storage.posg_cf,
&encode_term_quad(
&gspo.predicate,
&gspo.object,
&gspo.subject,
&gspo.graph_name,
),
)? {
return Err(CorruptionError::new("Quad in gspo and not in posg").into());
}
if !self.storage.db.contains_key(
&self.storage.ospg_cf,
&encode_term_quad(
&gspo.object,
&gspo.subject,
&gspo.predicate,
&gspo.graph_name,
),
)? {
return Err(CorruptionError::new("Quad in gspo and not in ospg").into());
}
if !self
.storage
.db
.contains_key(&self.storage.graphs_cf, &encode_term(&gspo.graph_name))?
{
return Err(
CorruptionError::new("Quad graph name in gspo and not in graphs").into(),
);
}
}
Ok(())
}
} }
pub struct ChainedDecodingQuadIterator { pub struct ChainedDecodingQuadIterator {
@ -1191,9 +1304,9 @@ impl BulkLoader {
self.build_sst_for_keys(self.quads.iter().map(|quad| { self.build_sst_for_keys(self.quads.iter().map(|quad| {
encode_term_quad( encode_term_quad(
&quad.graph_name, &quad.graph_name,
&quad.predicate,
&quad.object, &quad.object,
&quad.subject, &quad.subject,
&quad.predicate,
) )
}))?, }))?,
)); ));
@ -1223,9 +1336,9 @@ impl BulkLoader {
&self.storage.posg_cf, &self.storage.posg_cf,
self.build_sst_for_keys(self.quads.iter().map(|quad| { self.build_sst_for_keys(self.quads.iter().map(|quad| {
encode_term_quad( encode_term_quad(
&quad.predicate,
&quad.object, &quad.object,
&quad.subject, &quad.subject,
&quad.predicate,
&quad.graph_name, &quad.graph_name,
) )
}))?, }))?,

@ -857,6 +857,12 @@ impl Store {
pub fn bulk_extend(&self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> { pub fn bulk_extend(&self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> {
bulk_load::<StorageError, _, _>(&self.storage, quads.into_iter().map(Ok)) bulk_load::<StorageError, _, _>(&self.storage, quads.into_iter().map(Ok))
} }
/// Validates that all the store invariants held in the data
#[doc(hidden)]
pub fn validate(&self) -> Result<(), StorageError> {
self.storage.snapshot().validate()
}
} }
impl fmt::Display for Store { impl fmt::Display for Store {

@ -24,6 +24,21 @@ wd:Q90 a schema:City ;
schema:url "https://www.paris.fr/"^^xsd:anyURI ; schema:url "https://www.paris.fr/"^^xsd:anyURI ;
schema:postalCode "75001" . schema:postalCode "75001" .
"#; "#;
const GRAPH_DATA: &str = r#"
@prefix schema: <http://schema.org/> .
@prefix wd: <http://www.wikidata.org/entity/> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
GRAPH <http://www.wikidata.org/wiki/Special:EntityData/Q90> {
wd:Q90 a schema:City ;
schema:name "Paris"@fr , "la ville lumière"@fr ;
schema:country wd:Q142 ;
schema:population 2000000 ;
schema:startDate "-300"^^xsd:gYear ;
schema:url "https://www.paris.fr/"^^xsd:anyURI ;
schema:postalCode "75001" .
}
"#;
const NUMBER_OF_TRIPLES: usize = 8; const NUMBER_OF_TRIPLES: usize = 8;
fn quads(graph_name: impl Into<GraphNameRef<'static>>) -> Vec<QuadRef<'static>> { fn quads(graph_name: impl Into<GraphNameRef<'static>>) -> Vec<QuadRef<'static>> {
@ -91,26 +106,49 @@ fn test_load_graph() -> Result<(), Box<dyn Error>> {
for q in quads(GraphNameRef::DefaultGraph) { for q in quads(GraphNameRef::DefaultGraph) {
assert!(store.contains(q)?); assert!(store.contains(q)?);
} }
store.validate()?;
Ok(()) Ok(())
} }
#[test] #[test]
fn test_load_dataset() -> Result<(), Box<dyn Error>> { fn test_bulk_load_graph() -> Result<(), Box<dyn Error>> {
let store = Store::new()?; let store = Store::new()?;
store.load_dataset(Cursor::new(DATA), DatasetFormat::TriG, None)?; store.bulk_load_graph(
Cursor::new(DATA),
GraphFormat::Turtle,
GraphNameRef::DefaultGraph,
None,
)?;
for q in quads(GraphNameRef::DefaultGraph) { for q in quads(GraphNameRef::DefaultGraph) {
assert!(store.contains(q)?); assert!(store.contains(q)?);
} }
store.validate()?;
Ok(())
}
#[test]
fn test_load_dataset() -> Result<(), Box<dyn Error>> {
let store = Store::new()?;
store.load_dataset(Cursor::new(GRAPH_DATA), DatasetFormat::TriG, None)?;
for q in quads(NamedNodeRef::new_unchecked(
"http://www.wikidata.org/wiki/Special:EntityData/Q90",
)) {
assert!(store.contains(q)?);
}
store.validate()?;
Ok(()) Ok(())
} }
#[test] #[test]
fn test_bulk_load_dataset() -> Result<(), Box<dyn Error>> { fn test_bulk_load_dataset() -> Result<(), Box<dyn Error>> {
let store = Store::new().unwrap(); let store = Store::new().unwrap();
store.bulk_load_dataset(Cursor::new(DATA), DatasetFormat::TriG, None)?; store.bulk_load_dataset(Cursor::new(GRAPH_DATA), DatasetFormat::TriG, None)?;
for q in quads(GraphNameRef::DefaultGraph) { for q in quads(NamedNodeRef::new_unchecked(
"http://www.wikidata.org/wiki/Special:EntityData/Q90",
)) {
assert!(store.contains(q)?); assert!(store.contains(q)?);
} }
store.validate()?;
Ok(()) Ok(())
} }
@ -171,12 +209,13 @@ fn test_snapshot_isolation_iterator() -> Result<(), Box<dyn Error>> {
NamedNodeRef::new_unchecked("http://example.com/s"), NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"), NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"), NamedNodeRef::new_unchecked("http://example.com/o"),
NamedNodeRef::new_unchecked("http://example.com/g"), NamedNodeRef::new_unchecked("http://www.wikidata.org/wiki/Special:EntityData/Q90"),
); );
let store = Store::new()?; let store = Store::new()?;
store.insert(quad)?; store.insert(quad)?;
let iter = store.iter(); let iter = store.iter();
store.remove(quad)?; store.remove(quad)?;
store.validate()?;
assert_eq!( assert_eq!(
iter.collect::<Result<Vec<_>, _>>()?, iter.collect::<Result<Vec<_>, _>>()?,
vec![quad.into_owned()] vec![quad.into_owned()]
@ -190,7 +229,7 @@ fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box<dy
NamedNodeRef::new_unchecked("http://example.com/s"), NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"), NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"), NamedNodeRef::new_unchecked("http://example.com/o"),
NamedNodeRef::new_unchecked("http://example.com/g"), NamedNodeRef::new_unchecked("http://www.wikidata.org/wiki/Special:EntityData/Q90"),
); );
let store = Store::new()?; let store = Store::new()?;
store.remove(quad)?; store.remove(quad)?;
@ -244,7 +283,9 @@ fn test_backup() -> Result<(), Box<dyn Error>> {
store.remove(quad)?; store.remove(quad)?;
assert!(!store.contains(quad)?); assert!(!store.contains(quad)?);
assert!(Store::open(&backup_dir.0)?.contains(quad)?); let backup = Store::open(&backup_dir.0)?;
backup.validate()?;
assert!(backup.contains(quad)?);
Ok(()) Ok(())
} }

Loading…
Cancel
Save