Bulk loader: pre-allocate some buffers

speedb
Tpt 2 years ago committed by Thomas Tanon
parent f4b99e6953
commit 7787be6e84
  1. 26
      lib/src/storage/mod.rs

@ -21,7 +21,7 @@ use std::collections::VecDeque;
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::error::Error; use std::error::Error;
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
use std::mem::take; use std::mem::{swap, take};
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
@ -1266,6 +1266,7 @@ impl StorageBulkLoader {
&done_counter, &done_counter,
&mut done_and_displayed_counter, &mut done_and_displayed_counter,
num_threads, num_threads,
batch_size,
)?; )?;
} }
} }
@ -1275,6 +1276,7 @@ impl StorageBulkLoader {
&done_counter, &done_counter,
&mut done_and_displayed_counter, &mut done_and_displayed_counter,
num_threads, num_threads,
batch_size,
)?; )?;
for thread in threads { for thread in threads {
thread.join().unwrap()?; thread.join().unwrap()?;
@ -1290,6 +1292,7 @@ impl StorageBulkLoader {
done_counter: &Arc<AtomicU64>, done_counter: &Arc<AtomicU64>,
done_and_displayed_counter: &mut u64, done_and_displayed_counter: &mut u64,
num_threads: usize, num_threads: usize,
batch_size: usize,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
self.on_possible_progress(done_counter, done_and_displayed_counter); self.on_possible_progress(done_counter, done_and_displayed_counter);
// We avoid to have too many threads // We avoid to have too many threads
@ -1299,11 +1302,12 @@ impl StorageBulkLoader {
self.on_possible_progress(done_counter, done_and_displayed_counter); self.on_possible_progress(done_counter, done_and_displayed_counter);
} }
} }
let buffer = take(buffer); let mut buffer_to_load = Vec::with_capacity(batch_size);
swap(buffer, &mut buffer_to_load);
let storage = self.storage.clone(); let storage = self.storage.clone();
let done_counter_clone = done_counter.clone(); let done_counter_clone = done_counter.clone();
threads.push_back(spawn(move || { threads.push_back(spawn(move || {
FileBulkLoader::new(storage).load(buffer, &done_counter_clone) FileBulkLoader::new(storage, batch_size).load(buffer_to_load, &done_counter_clone)
})); }));
self.on_possible_progress(done_counter, done_and_displayed_counter); self.on_possible_progress(done_counter, done_and_displayed_counter);
Ok(()) Ok(())
@ -1332,21 +1336,17 @@ struct FileBulkLoader {
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
impl FileBulkLoader { impl FileBulkLoader {
fn new(storage: Storage) -> Self { fn new(storage: Storage, batch_size: usize) -> Self {
Self { Self {
storage, storage,
id2str: HashMap::default(), id2str: HashMap::with_capacity(3 * batch_size),
quads: HashSet::default(), quads: HashSet::with_capacity(batch_size),
triples: HashSet::default(), triples: HashSet::with_capacity(batch_size),
graphs: HashSet::default(), graphs: HashSet::default(),
} }
} }
fn load( fn load(&mut self, quads: Vec<Quad>, counter: &AtomicU64) -> Result<(), StorageError> {
&mut self,
quads: impl IntoIterator<Item = Quad>,
counter: &AtomicU64,
) -> Result<(), StorageError> {
self.encode(quads)?; self.encode(quads)?;
let size = self.triples.len() + self.quads.len(); let size = self.triples.len() + self.quads.len();
self.save()?; self.save()?;
@ -1354,7 +1354,7 @@ impl FileBulkLoader {
Ok(()) Ok(())
} }
fn encode(&mut self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> { fn encode(&mut self, quads: Vec<Quad>) -> Result<(), StorageError> {
for quad in quads { for quad in quads {
let encoded = EncodedQuad::from(quad.as_ref()); let encoded = EncodedQuad::from(quad.as_ref());
if quad.graph_name.is_default_graph() { if quad.graph_name.is_default_graph() {

Loading…
Cancel
Save