diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 8a92e77f..f1c81414 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -28,11 +28,7 @@ use std::path::{Path, PathBuf}; #[cfg(not(target_family = "wasm"))] use std::sync::atomic::{AtomicU64, Ordering}; #[cfg(not(target_family = "wasm"))] -use std::sync::Arc; -#[cfg(not(target_family = "wasm"))] -use std::thread::spawn; -#[cfg(not(target_family = "wasm"))] -use std::thread::JoinHandle; +use std::thread; mod backend; mod binary_encoder; @@ -1253,44 +1249,49 @@ impl StorageBulkLoader { ) .into()); } - let mut threads = VecDeque::with_capacity(num_threads - 1); - let mut buffer = Vec::with_capacity(batch_size); - let done_counter = Arc::new(AtomicU64::new(0)); + let done_counter = AtomicU64::new(0); let mut done_and_displayed_counter = 0; - for quad in quads { - let quad = quad?; - buffer.push(quad); - if buffer.len() >= batch_size { - self.spawn_load_thread( - &mut buffer, - &mut threads, - &done_counter, - &mut done_and_displayed_counter, - num_threads, - batch_size, - )?; + thread::scope(|thread_scope| { + let mut threads = VecDeque::with_capacity(num_threads - 1); + let mut buffer = Vec::with_capacity(batch_size); + for quad in quads { + let quad = quad?; + buffer.push(quad); + if buffer.len() >= batch_size { + self.spawn_load_thread( + &mut buffer, + &mut threads, + thread_scope, + &done_counter, + &mut done_and_displayed_counter, + num_threads, + batch_size, + )?; + } } - } - self.spawn_load_thread( - &mut buffer, - &mut threads, - &done_counter, - &mut done_and_displayed_counter, - num_threads, - batch_size, - )?; - for thread in threads { - thread.join().unwrap()?; - self.on_possible_progress(&done_counter, &mut done_and_displayed_counter); - } - Ok(()) + self.spawn_load_thread( + &mut buffer, + &mut threads, + thread_scope, + &done_counter, + &mut done_and_displayed_counter, + num_threads, + batch_size, + )?; + for thread in threads { + thread.join().unwrap()?; + self.on_possible_progress(&done_counter, &mut done_and_displayed_counter); + } + Ok(()) + }) } - fn spawn_load_thread( - &self, + fn spawn_load_thread<'scope>( + &'scope self, buffer: &mut Vec, - threads: &mut VecDeque>>, - done_counter: &Arc, + threads: &mut VecDeque>>, + thread_scope: &'scope thread::Scope<'scope, '_>, + done_counter: &'scope AtomicU64, done_and_displayed_counter: &mut u64, num_threads: usize, batch_size: usize, @@ -1305,10 +1306,9 @@ impl StorageBulkLoader { } let mut buffer_to_load = Vec::with_capacity(batch_size); swap(buffer, &mut buffer_to_load); - let storage = self.storage.clone(); - let done_counter_clone = Arc::clone(done_counter); - threads.push_back(spawn(move || { - FileBulkLoader::new(storage, batch_size).load(buffer_to_load, &done_counter_clone) + let storage = &self.storage; + threads.push_back(thread_scope.spawn(move || { + FileBulkLoader::new(storage, batch_size).load(buffer_to_load, done_counter) })); Ok(()) } @@ -1326,8 +1326,8 @@ impl StorageBulkLoader { } #[cfg(not(target_family = "wasm"))] -struct FileBulkLoader { - storage: Storage, +struct FileBulkLoader<'a> { + storage: &'a Storage, id2str: HashMap>, quads: HashSet, triples: HashSet, @@ -1335,8 +1335,8 @@ struct FileBulkLoader { } #[cfg(not(target_family = "wasm"))] -impl FileBulkLoader { - fn new(storage: Storage, batch_size: usize) -> Self { +impl<'a> FileBulkLoader<'a> { + fn new(storage: &'a Storage, batch_size: usize) -> Self { Self { storage, id2str: HashMap::with_capacity(3 * batch_size),