diff --git a/lib/benches/store.rs b/lib/benches/store.rs index e062583b..94f65d0f 100644 --- a/lib/benches/store.rs +++ b/lib/benches/store.rs @@ -75,7 +75,8 @@ fn do_load(store: &Store, data: &[u8]) { fn do_bulk_load(store: &Store, data: &[u8]) { store - .bulk_load_graph( + .bulk_loader() + .load_graph( Cursor::new(&data), GraphFormat::NTriples, GraphNameRef::DefaultGraph, diff --git a/lib/spargebra/src/parser.rs b/lib/spargebra/src/parser.rs index b23aa644..0e958332 100644 --- a/lib/spargebra/src/parser.rs +++ b/lib/spargebra/src/parser.rs @@ -1265,7 +1265,7 @@ parser! { } / i:InsertClause() { (None, Some(i)) } - rule Modify_clear() -> () = { + rule Modify_clear() = { state.used_bnodes.clear(); state.currently_used_bnodes.clear(); } diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index fbdc8e7f..9a775c25 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -13,6 +13,7 @@ use crate::storage::numeric_encoder::{ insert_term, Decoder, EncodedQuad, EncodedTerm, StrHash, StrLookup, }; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; +use std::collections::VecDeque; #[cfg(not(target_arch = "wasm32"))] use std::collections::{HashMap, HashSet}; use std::error::Error; @@ -20,8 +21,11 @@ use std::error::Error; use std::mem::take; #[cfg(not(target_arch = "wasm32"))] use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; #[cfg(not(target_arch = "wasm32"))] use std::thread::spawn; +use std::thread::JoinHandle; mod backend; mod binary_encoder; @@ -42,7 +46,7 @@ const DOSP_CF: &str = "dosp"; const GRAPHS_CF: &str = "graphs"; const DEFAULT_CF: &str = "default"; #[cfg(not(target_arch = "wasm32"))] -const BULK_LOAD_BATCH_SIZE: usize = 1024 * 1024; +const BULK_LOAD_BATCH_SIZE: usize = 1_000_000; /// Low level storage primitives #[derive(Clone)] @@ -1149,47 +1153,109 @@ impl<'a> StorageWriter<'a> { } } -/// Creates a database from a dataset files. #[cfg(not(target_arch = "wasm32"))] -pub fn bulk_load< - EI, - EO: From + From, - I: IntoIterator>, ->( - storage: &Storage, - quads: I, -) -> Result<(), EO> { - let mut threads = Vec::new(); - let mut buffer = Vec::with_capacity(BULK_LOAD_BATCH_SIZE); - for quad in quads { - let quad = quad?; - buffer.push(quad); - if buffer.len() >= BULK_LOAD_BATCH_SIZE { - let buffer = take(&mut buffer); - let storage = storage.clone(); - threads.push(spawn(move || BulkLoader::new(storage).load(buffer))); +pub struct StorageBulkLoader { + storage: Storage, + hooks: Vec>, + num_threads: usize, +} + +#[cfg(not(target_arch = "wasm32"))] +impl StorageBulkLoader { + pub fn new(storage: Storage) -> Self { + Self { + storage, + hooks: Vec::new(), + num_threads: num_cpus::get() * 4, + } + } + + pub fn on_progress(mut self, callback: impl Fn(u64) + 'static) -> Self { + self.hooks.push(Box::new(callback)); + self + } + + pub fn load + From, I: IntoIterator>>( + &self, + quads: I, + ) -> Result<(), EO> { + let mut threads = VecDeque::with_capacity(self.num_threads); + let mut buffer = Vec::with_capacity(BULK_LOAD_BATCH_SIZE); + let done_counter = Arc::new(AtomicU64::new(0)); + let mut done_and_displayed_counter = 0; + for quad in quads { + let quad = quad?; + buffer.push(quad); + if buffer.len() >= BULK_LOAD_BATCH_SIZE { + self.spawn_load_thread( + &mut buffer, + &mut threads, + &done_counter, + &mut done_and_displayed_counter, + )?; + } + } + self.spawn_load_thread( + &mut buffer, + &mut threads, + &done_counter, + &mut done_and_displayed_counter, + )?; + for thread in threads { + thread.join().unwrap()?; + self.on_possible_progress(&done_counter, &mut done_and_displayed_counter); + } + Ok(()) + } + + fn spawn_load_thread( + &self, + buffer: &mut Vec, + threads: &mut VecDeque>>, + done_counter: &Arc, + done_and_displayed_counter: &mut u64, + ) -> Result<(), StorageError> { + self.on_possible_progress(done_counter, done_and_displayed_counter); + // We avoid to have too many threads + if threads.len() >= self.num_threads { + if let Some(thread) = threads.pop_front() { + thread.join().unwrap()?; + self.on_possible_progress(done_counter, done_and_displayed_counter); + } } + let buffer = take(buffer); + let storage = self.storage.clone(); + let done_counter_clone = done_counter.clone(); + threads.push_back(spawn(move || { + FileBulkLoader::new(storage).load(buffer, &done_counter_clone) + })); + self.on_possible_progress(done_counter, done_and_displayed_counter); + Ok(()) } - BulkLoader::new(storage.clone()).load(buffer)?; // Last buffer - for thread in threads { - thread.join().unwrap()?; + + fn on_possible_progress(&self, done: &AtomicU64, done_and_displayed: &mut u64) { + let new_counter = done.fetch_max(*done_and_displayed, Ordering::Relaxed); + let display_step = u64::try_from(BULK_LOAD_BATCH_SIZE).unwrap(); + if new_counter % display_step > *done_and_displayed % display_step { + for hook in &self.hooks { + hook(new_counter); + } + } + *done_and_displayed = new_counter; } - Ok(()) } -/// Creates a database from a dataset files. #[cfg(not(target_arch = "wasm32"))] -struct BulkLoader { +struct FileBulkLoader { storage: Storage, id2str: HashMap>, quads: HashSet, triples: HashSet, graphs: HashSet, - buffer: Vec, } #[cfg(not(target_arch = "wasm32"))] -impl BulkLoader { +impl FileBulkLoader { fn new(storage: Storage) -> Self { Self { storage, @@ -1197,30 +1263,42 @@ impl BulkLoader { quads: HashSet::default(), triples: HashSet::default(), graphs: HashSet::default(), - buffer: Vec::new(), } } - fn load(&mut self, quads: impl IntoIterator) -> Result<(), StorageError> { + fn load( + &mut self, + quads: impl IntoIterator, + counter: &AtomicU64, + ) -> Result<(), StorageError> { + self.encode(quads)?; + let size = self.triples.len() + self.quads.len(); + self.save()?; + counter.fetch_add(size.try_into().unwrap(), Ordering::Relaxed); + Ok(()) + } + + fn encode(&mut self, quads: impl IntoIterator) -> Result<(), StorageError> { + let mut buffer = Vec::new(); for quad in quads { let encoded = EncodedQuad::from(quad.as_ref()); - self.buffer.clear(); + buffer.clear(); if quad.graph_name.is_default_graph() { - write_spo_quad(&mut self.buffer, &encoded); + write_spo_quad(&mut buffer, &encoded); if self.triples.insert(encoded.clone()) { self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?; self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?; self.insert_term(quad.object.as_ref(), &encoded.object)?; } } else { - write_spog_quad(&mut self.buffer, &encoded); + write_spog_quad(&mut buffer, &encoded); if self.quads.insert(encoded.clone()) { self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?; self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?; self.insert_term(quad.object.as_ref(), &encoded.object)?; - self.buffer.clear(); - write_term(&mut self.buffer, &encoded.graph_name); + buffer.clear(); + write_term(&mut buffer, &encoded.graph_name); if self.graphs.insert(encoded.graph_name.clone()) { self.insert_term( match quad.graph_name.as_ref() { @@ -1234,7 +1312,7 @@ impl BulkLoader { } } } - self.save() + Ok(()) } fn save(&mut self) -> Result<(), StorageError> { diff --git a/lib/src/store.rs b/lib/src/store.rs index 1f3d6003..879fd79c 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -32,9 +32,9 @@ use crate::sparql::{ evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update, UpdateOptions, }; -#[cfg(not(target_arch = "wasm32"))] -use crate::storage::bulk_load; use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm}; +#[cfg(not(target_arch = "wasm32"))] +use crate::storage::StorageBulkLoader; use crate::storage::{ ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader, StorageWriter, }; @@ -365,7 +365,7 @@ impl Store { /// Loads a graph file (i.e. triples) into the store. /// - /// This function is atomic, quite slow and memory hungry. To get much better performances you might want to use [`bulk_load_graph`](Store::bulk_load_graph). + /// This function is atomic, quite slow and memory hungry. To get much better performances you might want to use the [`bulk_loader`](Store::bulk_loader). /// /// Usage example: /// ``` @@ -411,7 +411,7 @@ impl Store { /// Loads a dataset file (i.e. quads) into the store. /// - /// This function is atomic, quite slow and memory hungry. To get much better performances you might want to [`bulk_load_dataset`](Store::bulk_load_dataset). + /// This function is atomic, quite slow and memory hungry. To get much better performances you might want to use the [`bulk_loader`](Store::bulk_loader). /// /// Usage example: /// ``` @@ -477,7 +477,7 @@ impl Store { /// Adds atomically a set of quads to this store. /// - /// Warning: This operation uses a memory heavy transaction internally, use [`bulk_extend`](Store::bulk_extend) if you plan to add ten of millions of triples. + /// Warning: This operation uses a memory heavy transaction internally, use the [`bulk_loader`](Store::bulk_loader) if you plan to add ten of millions of triples. pub fn extend( &self, quads: impl IntoIterator>, @@ -753,15 +753,7 @@ impl Store { self.storage.backup(target_directory.as_ref()) } - /// Loads a dataset file efficiently into the store. - /// - /// This function is optimized for large dataset loading speed. For small files, [`load_dataset`](Store::load_dataset) might be more convenient. - /// - /// Warning: This method is not atomic. - /// If the parsing fails in the middle of the file, only a part of it may be written to the store. - /// Results might get weird if you delete data during the loading process. - /// - /// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. + /// Creates a bulk loader allowing to load at lot of data quickly into the store. /// /// Usage example: /// ``` @@ -771,9 +763,9 @@ impl Store { /// /// let store = Store::new()?; /// - /// // insertion + /// // quads file insertion /// let file = b" ."; - /// store.bulk_load_dataset(file.as_ref(), DatasetFormat::NQuads, None)?; + /// store.bulk_loader().load_dataset(file.as_ref(), DatasetFormat::NQuads, None)?; /// /// // we inspect the store contents /// let ex = NamedNodeRef::new("http://example.com")?; @@ -781,81 +773,10 @@ impl Store { /// # Result::<_, Box>::Ok(()) /// ``` #[cfg(not(target_arch = "wasm32"))] - pub fn bulk_load_dataset( - &self, - reader: impl BufRead, - format: DatasetFormat, - base_iri: Option<&str>, - ) -> Result<(), LoaderError> { - let mut parser = DatasetParser::from_format(format); - if let Some(base_iri) = base_iri { - parser = parser - .with_base_iri(base_iri) - .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; + pub fn bulk_loader(&self) -> BulkLoader { + BulkLoader { + storage: StorageBulkLoader::new(self.storage.clone()), } - bulk_load(&self.storage, parser.read_quads(reader)?) - } - - /// Loads a dataset file efficiently into the store. - /// - /// This function is optimized for large dataset loading speed. For small files, [`load_graph`](Store::load_graph) might be more convenient. - /// - /// Warning: This method is not atomic. - /// If the parsing fails in the middle of the file, only a part of it may be written to the store. - /// Results might get weird if you delete data during the loading process. - /// - /// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. - /// - /// Usage example: - /// ``` - /// use oxigraph::store::Store; - /// use oxigraph::io::GraphFormat; - /// use oxigraph::model::*; - /// - /// let store = Store::new()?; - /// - /// // insertion - /// let file = b" ."; - /// store.bulk_load_graph(file.as_ref(), GraphFormat::NTriples, GraphNameRef::DefaultGraph, None)?; - /// - /// // we inspect the store contents - /// let ex = NamedNodeRef::new("http://example.com")?; - /// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?); - /// # Result::<_, Box>::Ok(()) - /// ``` - #[cfg(not(target_arch = "wasm32"))] - pub fn bulk_load_graph<'a>( - &self, - reader: impl BufRead, - format: GraphFormat, - to_graph_name: impl Into>, - base_iri: Option<&str>, - ) -> Result<(), LoaderError> { - let mut parser = GraphParser::from_format(format); - if let Some(base_iri) = base_iri { - parser = parser - .with_base_iri(base_iri) - .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; - } - let to_graph_name = to_graph_name.into(); - bulk_load( - &self.storage, - parser - .read_triples(reader)? - .map(|r| r.map(|q| q.in_graph(to_graph_name.into_owned()))), - ) - } - - /// Adds a set of triples to this store using bulk load. - /// - /// Warning: This method is not atomic. - /// If the process fails in the middle of the file, only a part of the data may be written to the store. - /// Results might get weird if you delete data during the loading process. - /// - /// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. - #[cfg(not(target_arch = "wasm32"))] - pub fn bulk_extend(&self, quads: impl IntoIterator) -> Result<(), StorageError> { - bulk_load::(&self.storage, quads.into_iter().map(Ok)) } /// Validates that all the store invariants held in the data @@ -1155,9 +1076,7 @@ impl<'a> Transaction<'a> { self.writer.insert(quad.into()) } - /// Adds atomically a set of quads to this store. - /// - /// Warning: This operation uses a memory heavy transaction internally, use [`bulk_extend`](Store::bulk_extend) if you plan to add ten of millions of triples. + /// Adds a set of quads to this store. pub fn extend<'b>( &mut self, quads: impl IntoIterator>>, @@ -1346,6 +1265,148 @@ impl Iterator for GraphNameIter { } } +/// A bulk loader allowing to load at lot of data quickly into the store. +/// +/// Warning: The operations provided here are not atomic. +/// If the operation fails in the middle, only a part of the data may be written to the store. +/// Results might get weird if you delete data during the loading process. +/// +/// Warning: It is optimized for speed. It uses multiple threads and GBs of RAM on large files. +/// +/// Usage example with loading a dataset: +/// ``` +/// use oxigraph::store::Store; +/// use oxigraph::io::DatasetFormat; +/// use oxigraph::model::*; +/// +/// let store = Store::new()?; +/// +/// // quads file insertion +/// let file = b" ."; +/// store.bulk_loader().load_dataset(file.as_ref(), DatasetFormat::NQuads, None)?; +/// +/// // we inspect the store contents +/// let ex = NamedNodeRef::new("http://example.com")?; +/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?); +/// # Result::<_, Box>::Ok(()) +/// ``` +#[cfg(not(target_arch = "wasm32"))] +pub struct BulkLoader { + storage: StorageBulkLoader, +} + +#[cfg(not(target_arch = "wasm32"))] +impl BulkLoader { + /// Adds a `callback` evaluated from time to time with the number of loaded triples. + pub fn on_progress(self, callback: impl Fn(u64) + 'static) -> Self { + Self { + storage: self.storage.on_progress(callback), + } + } + + /// Loads a dataset file using the bulk loader. + /// + /// This function is optimized for large dataset loading speed. For small files, [`Store::load_dataset`] might be more convenient. + /// + /// Warning: This method is not atomic. + /// If the parsing fails in the middle of the file, only a part of it may be written to the store. + /// Results might get weird if you delete data during the loading process. + /// + /// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. + /// + /// Usage example: + /// ``` + /// use oxigraph::store::Store; + /// use oxigraph::io::DatasetFormat; + /// use oxigraph::model::*; + /// + /// let store = Store::new()?; + /// + /// // insertion + /// let file = b" ."; + /// store.bulk_loader().load_dataset(file.as_ref(), DatasetFormat::NQuads, None)?; + /// + /// // we inspect the store contents + /// let ex = NamedNodeRef::new("http://example.com")?; + /// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?); + /// # Result::<_, Box>::Ok(()) + /// ``` + pub fn load_dataset( + &self, + reader: impl BufRead, + format: DatasetFormat, + base_iri: Option<&str>, + ) -> Result<(), LoaderError> { + let mut parser = DatasetParser::from_format(format); + if let Some(base_iri) = base_iri { + parser = parser + .with_base_iri(base_iri) + .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; + } + self.storage.load(parser.read_quads(reader)?) + } + + /// Loads a graph file using the bulk loader. + /// + /// This function is optimized for large graph loading speed. For small files, [`Store::load_graph`] might be more convenient. + /// + /// Warning: This method is not atomic. + /// If the parsing fails in the middle of the file, only a part of it may be written to the store. + /// Results might get weird if you delete data during the loading process. + /// + /// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. + /// + /// Usage example: + /// ``` + /// use oxigraph::store::Store; + /// use oxigraph::io::GraphFormat; + /// use oxigraph::model::*; + /// + /// let store = Store::new()?; + /// + /// // insertion + /// let file = b" ."; + /// store.bulk_loader().load_graph(file.as_ref(), GraphFormat::NTriples, GraphNameRef::DefaultGraph, None)?; + /// + /// // we inspect the store contents + /// let ex = NamedNodeRef::new("http://example.com")?; + /// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?); + /// # Result::<_, Box>::Ok(()) + /// ``` + pub fn load_graph<'a>( + &self, + reader: impl BufRead, + format: GraphFormat, + to_graph_name: impl Into>, + base_iri: Option<&str>, + ) -> Result<(), LoaderError> { + let mut parser = GraphParser::from_format(format); + if let Some(base_iri) = base_iri { + parser = parser + .with_base_iri(base_iri) + .map_err(|e| ParseError::invalid_base_iri(base_iri, e))?; + } + let to_graph_name = to_graph_name.into(); + self.storage.load( + parser + .read_triples(reader)? + .map(|r| r.map(|q| q.in_graph(to_graph_name.into_owned()))), + ) + } + + /// Adds a set of quads using the bulk loader. + /// + /// Warning: This method is not atomic. + /// If the process fails in the middle of the file, only a part of the data may be written to the store. + /// Results might get weird if you delete data during the loading process. + /// + /// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. + pub fn load_quads(&self, quads: impl IntoIterator) -> Result<(), StorageError> { + self.storage + .load::(quads.into_iter().map(Ok)) + } +} + #[test] fn store() -> Result<(), StorageError> { use crate::model::*; diff --git a/lib/tests/store.rs b/lib/tests/store.rs index 578bc860..f34d0ac0 100644 --- a/lib/tests/store.rs +++ b/lib/tests/store.rs @@ -113,7 +113,7 @@ fn test_load_graph() -> Result<(), Box> { #[test] fn test_bulk_load_graph() -> Result<(), Box> { let store = Store::new()?; - store.bulk_load_graph( + store.bulk_loader().load_graph( Cursor::new(DATA), GraphFormat::Turtle, GraphNameRef::DefaultGraph, @@ -142,7 +142,9 @@ fn test_load_dataset() -> Result<(), Box> { #[test] fn test_bulk_load_dataset() -> Result<(), Box> { let store = Store::new().unwrap(); - store.bulk_load_dataset(Cursor::new(GRAPH_DATA), DatasetFormat::TriG, None)?; + store + .bulk_loader() + .load_dataset(Cursor::new(GRAPH_DATA), DatasetFormat::TriG, None)?; for q in quads(NamedNodeRef::new_unchecked( "http://www.wikidata.org/wiki/Special:EntityData/Q90", )) { @@ -233,7 +235,7 @@ fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box Result<(), Box> { let store = Store::open(&dir.0)?; remove_dir_all(&dir.0)?; assert!(store - .bulk_extend(once(Quad { + .bulk_loader() + .load_quads(once(Quad { subject: NamedNode::new_unchecked("http://example.com/s").into(), predicate: NamedNode::new_unchecked("http://example.com/p"), object: NamedNode::new_unchecked("http://example.com/o").into(), diff --git a/python/src/store.rs b/python/src/store.rs index ef4b7538..45993ee8 100644 --- a/python/src/store.rs +++ b/python/src/store.rs @@ -376,7 +376,8 @@ impl PyStore { py.allow_threads(|| { if let Some(graph_format) = GraphFormat::from_media_type(mime_type) { self.inner - .bulk_load_graph( + .bulk_loader() + .load_graph( input, graph_format, &to_graph_name.unwrap_or(GraphName::DefaultGraph), @@ -390,7 +391,8 @@ impl PyStore { )); } self.inner - .bulk_load_dataset(input, dataset_format, base_iri) + .bulk_loader() + .load_dataset(input, dataset_format, base_iri) .map_err(map_loader_error) } else { Err(PyValueError::new_err(format!( diff --git a/server/src/main.rs b/server/src/main.rs index a43ce4c1..408377b3 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -28,7 +28,7 @@ use std::path::PathBuf; use std::rc::Rc; use std::str::FromStr; use std::thread::{spawn, JoinHandle}; -use std::time::Duration; +use std::time::{Duration, Instant}; use url::form_urlencoded; const MAX_SPARQL_BODY_SIZE: u64 = 1_048_576; @@ -78,20 +78,29 @@ pub fn main() -> std::io::Result<()> { let store = store.clone(); let file = file.to_string(); spawn(move || { - let format = file + let f = file.clone(); + let start = Instant::now(); + let loader = store.bulk_loader().on_progress(move |size| { + let elapsed = start.elapsed(); + println!("{} triples loaded in {}s ({} t/s) from {}", size, elapsed.as_secs(), size / elapsed.as_secs(), f) + }); + let reader = BufReader::new(File::open(&file)?); + if let Some(format) = file .rsplit_once('.') - .and_then(|(_, extension)| { - DatasetFormat::from_extension(extension) - .or_else(|| GraphFormat::from_extension(extension)?.try_into().ok()) - }) - .ok_or_else(|| { - io::Error::new( - ErrorKind::InvalidInput, - "The server is not able to guess the file format of {} from its extension", - ) - })?; - store.bulk_load_dataset(BufReader::new(File::open(file)?), format, None)?; - Ok(()) + .and_then(|(_, extension)| DatasetFormat::from_extension(extension)) { + loader.load_dataset(reader, format, None)?; + Ok(()) + } else if let Some(format) = file + .rsplit_once('.') + .and_then(|(_, extension)| GraphFormat::from_extension(extension)) { + loader.load_graph(reader, format, GraphNameRef::DefaultGraph, None)?; + Ok(()) + } else { + Err(io::Error::new( + ErrorKind::InvalidInput, + "The server is not able to guess the file format of {} from its extension", + )) + } }) }).collect::>>>(); for handle in handles {