From 822dd60596d12650add31a2ce96f3ab92b80aece Mon Sep 17 00:00:00 2001 From: Tpt Date: Wed, 2 Feb 2022 19:37:08 +0100 Subject: [PATCH] Fixes invalid database generated from bulk load Adds an invariant validation test --- lib/src/storage/mod.rs | 119 +++++++++++++++++++++++++++++++++++++++-- lib/src/store.rs | 6 +++ lib/tests/store.rs | 55 ++++++++++++++++--- 3 files changed, 170 insertions(+), 10 deletions(-) diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 5b32afd6..156e3da2 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -9,7 +9,9 @@ use crate::storage::binary_encoder::{ WRITTEN_TERM_MAX_SIZE, }; pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, StorageError}; -use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup}; +use crate::storage::numeric_encoder::{ + insert_term, Decoder, EncodedQuad, EncodedTerm, StrHash, StrLookup, +}; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; #[cfg(not(target_arch = "wasm32"))] use std::collections::{HashMap, HashSet}; @@ -631,6 +633,117 @@ impl StorageReader { self.reader .contains_key(&self.storage.id2str_cf, &key.to_be_bytes()) } + + /// Validates that all the storage invariants held in the data + pub fn validate(&self) -> Result<(), StorageError> { + // triples + let dspo_size = self.dspo_quads(&[]).count(); + if dspo_size != self.dpos_quads(&[]).count() || dspo_size != self.dosp_quads(&[]).count() { + return Err(CorruptionError::new( + "Not the same number of triples in dspo, dpos and dosp", + ) + .into()); + } + for spo in self.dspo_quads(&[]) { + let spo = spo?; + self.decode_quad(&spo)?; // We ensure that the quad is readable + if !self.storage.db.contains_key( + &self.storage.dpos_cf, + &encode_term_triple(&spo.predicate, &spo.object, &spo.subject), + )? { + return Err(CorruptionError::new("Quad in dspo and not in dpos").into()); + } + if !self.storage.db.contains_key( + &self.storage.dosp_cf, + &encode_term_triple(&spo.object, &spo.subject, &spo.predicate), + )? { + return Err(CorruptionError::new("Quad in dspo and not in dpos").into()); + } + } + + // quads + let gspo_size = self.gspo_quads(&[]).count(); + if gspo_size != self.gpos_quads(&[]).count() + || gspo_size != self.gosp_quads(&[]).count() + || gspo_size != self.spog_quads(&[]).count() + || gspo_size != self.posg_quads(&[]).count() + || gspo_size != self.ospg_quads(&[]).count() + { + return Err(CorruptionError::new( + "Not the same number of triples in dspo, dpos and dosp", + ) + .into()); + } + for gspo in self.gspo_quads(&[]) { + let gspo = gspo?; + self.decode_quad(&gspo)?; // We ensure that the quad is readable + if !self.storage.db.contains_key( + &self.storage.gpos_cf, + &encode_term_quad( + &gspo.graph_name, + &gspo.predicate, + &gspo.object, + &gspo.subject, + ), + )? { + return Err(CorruptionError::new("Quad in gspo and not in gpos").into()); + } + if !self.storage.db.contains_key( + &self.storage.gosp_cf, + &encode_term_quad( + &gspo.graph_name, + &gspo.object, + &gspo.subject, + &gspo.predicate, + ), + )? { + return Err(CorruptionError::new("Quad in gspo and not in gosp").into()); + } + if !self.storage.db.contains_key( + &self.storage.spog_cf, + &encode_term_quad( + &gspo.subject, + &gspo.predicate, + &gspo.object, + &gspo.graph_name, + ), + )? { + return Err(CorruptionError::new("Quad in gspo and not in spog").into()); + } + if !self.storage.db.contains_key( + &self.storage.posg_cf, + &encode_term_quad( + &gspo.predicate, + &gspo.object, + &gspo.subject, + &gspo.graph_name, + ), + )? { + return Err(CorruptionError::new("Quad in gspo and not in posg").into()); + } + if !self.storage.db.contains_key( + &self.storage.ospg_cf, + &encode_term_quad( + &gspo.object, + &gspo.subject, + &gspo.predicate, + &gspo.graph_name, + ), + )? { + return Err(CorruptionError::new("Quad in gspo and not in ospg").into()); + } + if !self + .storage + .db + .contains_key(&self.storage.graphs_cf, &encode_term(&gspo.graph_name))? + { + return Err( + CorruptionError::new("Quad graph name in gspo and not in graphs").into(), + ); + } + } + Ok(()) + } } pub struct ChainedDecodingQuadIterator { @@ -1191,9 +1304,9 @@ impl BulkLoader { self.build_sst_for_keys(self.quads.iter().map(|quad| { encode_term_quad( &quad.graph_name, + &quad.predicate, &quad.object, &quad.subject, - &quad.predicate, ) }))?, )); @@ -1223,9 +1336,9 @@ impl BulkLoader { &self.storage.posg_cf, self.build_sst_for_keys(self.quads.iter().map(|quad| { encode_term_quad( + &quad.predicate, &quad.object, &quad.subject, - &quad.predicate, &quad.graph_name, ) }))?, diff --git a/lib/src/store.rs b/lib/src/store.rs index c15d2c65..d3ff304e 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -857,6 +857,12 @@ impl Store { pub fn bulk_extend(&self, quads: impl IntoIterator) -> Result<(), StorageError> { bulk_load::(&self.storage, quads.into_iter().map(Ok)) } + + /// Validates that all the store invariants held in the data + #[doc(hidden)] + pub fn validate(&self) -> Result<(), StorageError> { + self.storage.snapshot().validate() + } } impl fmt::Display for Store { diff --git a/lib/tests/store.rs b/lib/tests/store.rs index 5d651db8..578bc860 100644 --- a/lib/tests/store.rs +++ b/lib/tests/store.rs @@ -24,6 +24,21 @@ wd:Q90 a schema:City ; schema:url "https://www.paris.fr/"^^xsd:anyURI ; schema:postalCode "75001" . "#; +const GRAPH_DATA: &str = r#" +@prefix schema: . +@prefix wd: . +@prefix xsd: . + +GRAPH { + wd:Q90 a schema:City ; + schema:name "Paris"@fr , "la ville lumière"@fr ; + schema:country wd:Q142 ; + schema:population 2000000 ; + schema:startDate "-300"^^xsd:gYear ; + schema:url "https://www.paris.fr/"^^xsd:anyURI ; + schema:postalCode "75001" . +} +"#; const NUMBER_OF_TRIPLES: usize = 8; fn quads(graph_name: impl Into>) -> Vec> { @@ -91,26 +106,49 @@ fn test_load_graph() -> Result<(), Box> { for q in quads(GraphNameRef::DefaultGraph) { assert!(store.contains(q)?); } + store.validate()?; Ok(()) } #[test] -fn test_load_dataset() -> Result<(), Box> { +fn test_bulk_load_graph() -> Result<(), Box> { let store = Store::new()?; - store.load_dataset(Cursor::new(DATA), DatasetFormat::TriG, None)?; + store.bulk_load_graph( + Cursor::new(DATA), + GraphFormat::Turtle, + GraphNameRef::DefaultGraph, + None, + )?; for q in quads(GraphNameRef::DefaultGraph) { assert!(store.contains(q)?); } + store.validate()?; + Ok(()) +} + +#[test] +fn test_load_dataset() -> Result<(), Box> { + let store = Store::new()?; + store.load_dataset(Cursor::new(GRAPH_DATA), DatasetFormat::TriG, None)?; + for q in quads(NamedNodeRef::new_unchecked( + "http://www.wikidata.org/wiki/Special:EntityData/Q90", + )) { + assert!(store.contains(q)?); + } + store.validate()?; Ok(()) } #[test] fn test_bulk_load_dataset() -> Result<(), Box> { let store = Store::new().unwrap(); - store.bulk_load_dataset(Cursor::new(DATA), DatasetFormat::TriG, None)?; - for q in quads(GraphNameRef::DefaultGraph) { + store.bulk_load_dataset(Cursor::new(GRAPH_DATA), DatasetFormat::TriG, None)?; + for q in quads(NamedNodeRef::new_unchecked( + "http://www.wikidata.org/wiki/Special:EntityData/Q90", + )) { assert!(store.contains(q)?); } + store.validate()?; Ok(()) } @@ -171,12 +209,13 @@ fn test_snapshot_isolation_iterator() -> Result<(), Box> { NamedNodeRef::new_unchecked("http://example.com/s"), NamedNodeRef::new_unchecked("http://example.com/p"), NamedNodeRef::new_unchecked("http://example.com/o"), - NamedNodeRef::new_unchecked("http://example.com/g"), + NamedNodeRef::new_unchecked("http://www.wikidata.org/wiki/Special:EntityData/Q90"), ); let store = Store::new()?; store.insert(quad)?; let iter = store.iter(); store.remove(quad)?; + store.validate()?; assert_eq!( iter.collect::, _>>()?, vec![quad.into_owned()] @@ -190,7 +229,7 @@ fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box Result<(), Box> { store.remove(quad)?; assert!(!store.contains(quad)?); - assert!(Store::open(&backup_dir.0)?.contains(quad)?); + let backup = Store::open(&backup_dir.0)?; + backup.validate()?; + assert!(backup.contains(quad)?); Ok(()) }