From ba28fc26e76f4bc751c88517cf555b484ffbe2d3 Mon Sep 17 00:00:00 2001 From: Tpt Date: Wed, 22 Mar 2023 19:15:08 +0100 Subject: [PATCH] Bulk loader: pre-allocate some buffers --- lib/src/storage/mod.rs | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 0d5f7b1b..2539f35e 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -21,7 +21,7 @@ use std::collections::VecDeque; use std::collections::{HashMap, HashSet}; use std::error::Error; #[cfg(not(target_family = "wasm"))] -use std::mem::take; +use std::mem::{swap, take}; #[cfg(not(target_family = "wasm"))] use std::path::{Path, PathBuf}; #[cfg(not(target_family = "wasm"))] @@ -1266,6 +1266,7 @@ impl StorageBulkLoader { &done_counter, &mut done_and_displayed_counter, num_threads, + batch_size, )?; } } @@ -1275,6 +1276,7 @@ impl StorageBulkLoader { &done_counter, &mut done_and_displayed_counter, num_threads, + batch_size, )?; for thread in threads { thread.join().unwrap()?; @@ -1290,6 +1292,7 @@ impl StorageBulkLoader { done_counter: &Arc, done_and_displayed_counter: &mut u64, num_threads: usize, + batch_size: usize, ) -> Result<(), StorageError> { self.on_possible_progress(done_counter, done_and_displayed_counter); // We avoid to have too many threads @@ -1299,11 +1302,12 @@ impl StorageBulkLoader { self.on_possible_progress(done_counter, done_and_displayed_counter); } } - let buffer = take(buffer); + let mut buffer_to_load = Vec::with_capacity(batch_size); + swap(buffer, &mut buffer_to_load); let storage = self.storage.clone(); let done_counter_clone = done_counter.clone(); threads.push_back(spawn(move || { - FileBulkLoader::new(storage).load(buffer, &done_counter_clone) + FileBulkLoader::new(storage, batch_size).load(buffer_to_load, &done_counter_clone) })); self.on_possible_progress(done_counter, done_and_displayed_counter); Ok(()) @@ -1332,21 +1336,17 @@ struct FileBulkLoader { #[cfg(not(target_family = "wasm"))] impl FileBulkLoader { - fn new(storage: Storage) -> Self { + fn new(storage: Storage, batch_size: usize) -> Self { Self { storage, - id2str: HashMap::default(), - quads: HashSet::default(), - triples: HashSet::default(), + id2str: HashMap::with_capacity(3 * batch_size), + quads: HashSet::with_capacity(batch_size), + triples: HashSet::with_capacity(batch_size), graphs: HashSet::default(), } } - fn load( - &mut self, - quads: impl IntoIterator, - counter: &AtomicU64, - ) -> Result<(), StorageError> { + fn load(&mut self, quads: Vec, counter: &AtomicU64) -> Result<(), StorageError> { self.encode(quads)?; let size = self.triples.len() + self.quads.len(); self.save()?; @@ -1354,7 +1354,7 @@ impl FileBulkLoader { Ok(()) } - fn encode(&mut self, quads: impl IntoIterator) -> Result<(), StorageError> { + fn encode(&mut self, quads: Vec) -> Result<(), StorageError> { for quad in quads { let encoded = EncodedQuad::from(quad.as_ref()); if quad.graph_name.is_default_graph() {