|
|
@ -26,7 +26,7 @@ use std::mem::{swap, take}; |
|
|
|
#[cfg(not(target_family = "wasm"))] |
|
|
|
#[cfg(not(target_family = "wasm"))] |
|
|
|
use std::path::{Path, PathBuf}; |
|
|
|
use std::path::{Path, PathBuf}; |
|
|
|
#[cfg(not(target_family = "wasm"))] |
|
|
|
#[cfg(not(target_family = "wasm"))] |
|
|
|
use std::sync::atomic::{AtomicU64, Ordering}; |
|
|
|
use std::sync::Mutex; |
|
|
|
#[cfg(not(target_family = "wasm"))] |
|
|
|
#[cfg(not(target_family = "wasm"))] |
|
|
|
use std::{io, thread}; |
|
|
|
use std::{io, thread}; |
|
|
|
|
|
|
|
|
|
|
@ -1249,7 +1249,7 @@ impl StorageBulkLoader { |
|
|
|
) |
|
|
|
) |
|
|
|
.into()); |
|
|
|
.into()); |
|
|
|
} |
|
|
|
} |
|
|
|
let done_counter = AtomicU64::new(0); |
|
|
|
let done_counter = Mutex::new(0); |
|
|
|
let mut done_and_displayed_counter = 0; |
|
|
|
let mut done_and_displayed_counter = 0; |
|
|
|
thread::scope(|thread_scope| { |
|
|
|
thread::scope(|thread_scope| { |
|
|
|
let mut threads = VecDeque::with_capacity(num_threads - 1); |
|
|
|
let mut threads = VecDeque::with_capacity(num_threads - 1); |
|
|
@ -1280,7 +1280,7 @@ impl StorageBulkLoader { |
|
|
|
)?; |
|
|
|
)?; |
|
|
|
for thread in threads { |
|
|
|
for thread in threads { |
|
|
|
map_thread_result(thread.join()).map_err(StorageError::Io)??; |
|
|
|
map_thread_result(thread.join()).map_err(StorageError::Io)??; |
|
|
|
self.on_possible_progress(&done_counter, &mut done_and_displayed_counter); |
|
|
|
self.on_possible_progress(&done_counter, &mut done_and_displayed_counter)?; |
|
|
|
} |
|
|
|
} |
|
|
|
Ok(()) |
|
|
|
Ok(()) |
|
|
|
}) |
|
|
|
}) |
|
|
@ -1291,17 +1291,17 @@ impl StorageBulkLoader { |
|
|
|
buffer: &mut Vec<Quad>, |
|
|
|
buffer: &mut Vec<Quad>, |
|
|
|
threads: &mut VecDeque<thread::ScopedJoinHandle<'scope, Result<(), StorageError>>>, |
|
|
|
threads: &mut VecDeque<thread::ScopedJoinHandle<'scope, Result<(), StorageError>>>, |
|
|
|
thread_scope: &'scope thread::Scope<'scope, '_>, |
|
|
|
thread_scope: &'scope thread::Scope<'scope, '_>, |
|
|
|
done_counter: &'scope AtomicU64, |
|
|
|
done_counter: &'scope Mutex<u64>, |
|
|
|
done_and_displayed_counter: &mut u64, |
|
|
|
done_and_displayed_counter: &mut u64, |
|
|
|
num_threads: usize, |
|
|
|
num_threads: usize, |
|
|
|
batch_size: usize, |
|
|
|
batch_size: usize, |
|
|
|
) -> Result<(), StorageError> { |
|
|
|
) -> Result<(), StorageError> { |
|
|
|
self.on_possible_progress(done_counter, done_and_displayed_counter); |
|
|
|
self.on_possible_progress(done_counter, done_and_displayed_counter)?; |
|
|
|
// We avoid to have too many threads
|
|
|
|
// We avoid to have too many threads
|
|
|
|
if threads.len() >= num_threads { |
|
|
|
if threads.len() >= num_threads { |
|
|
|
if let Some(thread) = threads.pop_front() { |
|
|
|
if let Some(thread) = threads.pop_front() { |
|
|
|
map_thread_result(thread.join()).map_err(StorageError::Io)??; |
|
|
|
map_thread_result(thread.join()).map_err(StorageError::Io)??; |
|
|
|
self.on_possible_progress(done_counter, done_and_displayed_counter); |
|
|
|
self.on_possible_progress(done_counter, done_and_displayed_counter)?; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
let mut buffer_to_load = Vec::with_capacity(batch_size); |
|
|
|
let mut buffer_to_load = Vec::with_capacity(batch_size); |
|
|
@ -1313,15 +1313,22 @@ impl StorageBulkLoader { |
|
|
|
Ok(()) |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn on_possible_progress(&self, done: &AtomicU64, done_and_displayed: &mut u64) { |
|
|
|
fn on_possible_progress( |
|
|
|
let new_counter = done.load(Ordering::Relaxed); |
|
|
|
&self, |
|
|
|
let display_step = u64::try_from(DEFAULT_BULK_LOAD_BATCH_SIZE).unwrap(); |
|
|
|
done: &Mutex<u64>, |
|
|
|
|
|
|
|
done_and_displayed: &mut u64, |
|
|
|
|
|
|
|
) -> Result<(), StorageError> { |
|
|
|
|
|
|
|
let new_counter = *done |
|
|
|
|
|
|
|
.lock() |
|
|
|
|
|
|
|
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Mutex poisoned"))?; |
|
|
|
|
|
|
|
let display_step = DEFAULT_BULK_LOAD_BATCH_SIZE as u64; |
|
|
|
if new_counter / display_step > *done_and_displayed / display_step { |
|
|
|
if new_counter / display_step > *done_and_displayed / display_step { |
|
|
|
for hook in &self.hooks { |
|
|
|
for hook in &self.hooks { |
|
|
|
hook(new_counter); |
|
|
|
hook(new_counter); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
*done_and_displayed = new_counter; |
|
|
|
*done_and_displayed = new_counter; |
|
|
|
|
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -1346,11 +1353,14 @@ impl<'a> FileBulkLoader<'a> { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn load(&mut self, quads: Vec<Quad>, counter: &AtomicU64) -> Result<(), StorageError> { |
|
|
|
fn load(&mut self, quads: Vec<Quad>, counter: &Mutex<u64>) -> Result<(), StorageError> { |
|
|
|
self.encode(quads)?; |
|
|
|
self.encode(quads)?; |
|
|
|
let size = self.triples.len() + self.quads.len(); |
|
|
|
let size = self.triples.len() + self.quads.len(); |
|
|
|
self.save()?; |
|
|
|
self.save()?; |
|
|
|
counter.fetch_add(size.try_into().unwrap_or(u64::MAX), Ordering::Relaxed); |
|
|
|
*counter |
|
|
|
|
|
|
|
.lock() |
|
|
|
|
|
|
|
.map_err(|_| io::Error::new(io::ErrorKind::Other, "Mutex poisoned"))? += |
|
|
|
|
|
|
|
size.try_into().unwrap_or(u64::MAX); |
|
|
|
Ok(()) |
|
|
|
Ok(()) |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|