Adds methods to configure bulk loader system consumption

pull/190/head
Tpt 3 years ago
parent aaed962252
commit 9e969cd8eb
  1. 50
      lib/src/storage/mod.rs
  2. 41
      lib/src/store.rs
  3. 14
      server/src/main.rs

@ -13,6 +13,7 @@ use crate::storage::numeric_encoder::{
insert_term, Decoder, EncodedQuad, EncodedTerm, StrHash, StrLookup, insert_term, Decoder, EncodedQuad, EncodedTerm, StrHash, StrLookup,
}; };
use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
use std::cmp::{max, min};
use std::collections::VecDeque; use std::collections::VecDeque;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@ -46,7 +47,7 @@ const DOSP_CF: &str = "dosp";
const GRAPHS_CF: &str = "graphs"; const GRAPHS_CF: &str = "graphs";
const DEFAULT_CF: &str = "default"; const DEFAULT_CF: &str = "default";
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
const BULK_LOAD_BATCH_SIZE: usize = 1_000_000; const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000;
/// Low level storage primitives /// Low level storage primitives
#[derive(Clone)] #[derive(Clone)]
@ -1157,7 +1158,8 @@ impl<'a> StorageWriter<'a> {
pub struct StorageBulkLoader { pub struct StorageBulkLoader {
storage: Storage, storage: Storage,
hooks: Vec<Box<dyn Fn(u64)>>, hooks: Vec<Box<dyn Fn(u64)>>,
num_threads: usize, num_threads: Option<usize>,
max_memory_size: Option<usize>,
} }
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -1166,8 +1168,19 @@ impl StorageBulkLoader {
Self { Self {
storage, storage,
hooks: Vec::new(), hooks: Vec::new(),
num_threads: num_cpus::get() * 4, num_threads: None,
max_memory_size: None,
}
} }
pub fn set_num_threads(mut self, num_threads: usize) -> Self {
self.num_threads = Some(num_threads);
self
}
pub fn set_max_memory_size_in_bytes(mut self, max_memory_size: usize) -> Self {
self.max_memory_size = Some(max_memory_size);
self
} }
pub fn on_progress(mut self, callback: impl Fn(u64) + 'static) -> Self { pub fn on_progress(mut self, callback: impl Fn(u64) + 'static) -> Self {
@ -1179,19 +1192,38 @@ impl StorageBulkLoader {
&self, &self,
quads: I, quads: I,
) -> Result<(), EO> { ) -> Result<(), EO> {
let mut threads = VecDeque::with_capacity(self.num_threads); let num_threads = max(
let mut buffer = Vec::with_capacity(BULK_LOAD_BATCH_SIZE); if let Some(num_threads) = self.num_threads {
num_threads
} else if let Some(max_memory_size) = self.max_memory_size {
min(
num_cpus::get(),
max_memory_size / 1000 / DEFAULT_BULK_LOAD_BATCH_SIZE,
)
} else {
num_cpus::get()
},
2,
);
let batch_size = if let Some(max_memory_size) = self.max_memory_size {
max(1000, max_memory_size / 1000 / num_threads)
} else {
DEFAULT_BULK_LOAD_BATCH_SIZE
};
let mut threads = VecDeque::with_capacity(num_threads - 1);
let mut buffer = Vec::with_capacity(batch_size);
let done_counter = Arc::new(AtomicU64::new(0)); let done_counter = Arc::new(AtomicU64::new(0));
let mut done_and_displayed_counter = 0; let mut done_and_displayed_counter = 0;
for quad in quads { for quad in quads {
let quad = quad?; let quad = quad?;
buffer.push(quad); buffer.push(quad);
if buffer.len() >= BULK_LOAD_BATCH_SIZE { if buffer.len() >= batch_size {
self.spawn_load_thread( self.spawn_load_thread(
&mut buffer, &mut buffer,
&mut threads, &mut threads,
&done_counter, &done_counter,
&mut done_and_displayed_counter, &mut done_and_displayed_counter,
num_threads,
)?; )?;
} }
} }
@ -1200,6 +1232,7 @@ impl StorageBulkLoader {
&mut threads, &mut threads,
&done_counter, &done_counter,
&mut done_and_displayed_counter, &mut done_and_displayed_counter,
num_threads,
)?; )?;
for thread in threads { for thread in threads {
thread.join().unwrap()?; thread.join().unwrap()?;
@ -1214,10 +1247,11 @@ impl StorageBulkLoader {
threads: &mut VecDeque<JoinHandle<Result<(), StorageError>>>, threads: &mut VecDeque<JoinHandle<Result<(), StorageError>>>,
done_counter: &Arc<AtomicU64>, done_counter: &Arc<AtomicU64>,
done_and_displayed_counter: &mut u64, done_and_displayed_counter: &mut u64,
num_threads: usize,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
self.on_possible_progress(done_counter, done_and_displayed_counter); self.on_possible_progress(done_counter, done_and_displayed_counter);
// We avoid to have too many threads // We avoid to have too many threads
if threads.len() >= self.num_threads { if threads.len() >= num_threads {
if let Some(thread) = threads.pop_front() { if let Some(thread) = threads.pop_front() {
thread.join().unwrap()?; thread.join().unwrap()?;
self.on_possible_progress(done_counter, done_and_displayed_counter); self.on_possible_progress(done_counter, done_and_displayed_counter);
@ -1235,7 +1269,7 @@ impl StorageBulkLoader {
fn on_possible_progress(&self, done: &AtomicU64, done_and_displayed: &mut u64) { fn on_possible_progress(&self, done: &AtomicU64, done_and_displayed: &mut u64) {
let new_counter = done.fetch_max(*done_and_displayed, Ordering::Relaxed); let new_counter = done.fetch_max(*done_and_displayed, Ordering::Relaxed);
let display_step = u64::try_from(BULK_LOAD_BATCH_SIZE).unwrap(); let display_step = u64::try_from(DEFAULT_BULK_LOAD_BATCH_SIZE).unwrap();
if new_counter % display_step > *done_and_displayed % display_step { if new_counter % display_step > *done_and_displayed % display_step {
for hook in &self.hooks { for hook in &self.hooks {
hook(new_counter); hook(new_counter);

@ -1271,7 +1271,12 @@ impl Iterator for GraphNameIter {
/// If the operation fails in the middle, only a part of the data may be written to the store. /// If the operation fails in the middle, only a part of the data may be written to the store.
/// Results might get weird if you delete data during the loading process. /// Results might get weird if you delete data during the loading process.
/// ///
/// Warning: It is optimized for speed. It uses multiple threads and GBs of RAM on large files. /// Warning: It is optimized for speed.
/// Memory usage is configurable using [`BulkLoader::set_max_memory_size_in_bytes`]
/// and the number of used threads with [`BulkLoader::set_num_threads`].
/// By default the memory consumption target (excluding the system and RocksDB internal consumption)
/// is 1GB per thread and the number of threads is set to the number of logical CPU cores provided by the system.
/// These targets are considered per loaded file.
/// ///
/// Usage example with loading a dataset: /// Usage example with loading a dataset:
/// ``` /// ```
@ -1297,6 +1302,34 @@ pub struct BulkLoader {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
impl BulkLoader { impl BulkLoader {
/// Sets the maximal number of threads to be used by the bulk loader per operation.
///
/// This number must be at last 2 (one for parsing and one for loading).
///
/// By default this is the number of logical CPU cores provided by the system except if
/// [`BulkLoader::set_max_memory_size_in_bytes`] is set. In this case at least one 1GB is reserved
/// per used thread.
pub fn set_num_threads(self, num_threads: usize) -> Self {
Self {
storage: self.storage.set_num_threads(num_threads),
}
}
/// Sets the maximal number of memory used by this operation.
///
/// This number must be at last a few megabytes per thread.
///
/// Memory used by RocksDB and the system is not taken into account in this limit.
/// Note that depending on the system behavior this amount might never be reached.
///
/// By default, at most 1GB per used thread is used
/// (i.e. at most GBs at the number of available logical CPU cores in total).
pub fn set_max_memory_size_in_bytes(self, max_memory_size: usize) -> Self {
Self {
storage: self.storage.set_max_memory_size_in_bytes(max_memory_size),
}
}
/// Adds a `callback` evaluated from time to time with the number of loaded triples. /// Adds a `callback` evaluated from time to time with the number of loaded triples.
pub fn on_progress(self, callback: impl Fn(u64) + 'static) -> Self { pub fn on_progress(self, callback: impl Fn(u64) + 'static) -> Self {
Self { Self {
@ -1312,7 +1345,7 @@ impl BulkLoader {
/// If the parsing fails in the middle of the file, only a part of it may be written to the store. /// If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// Results might get weird if you delete data during the loading process. /// Results might get weird if you delete data during the loading process.
/// ///
/// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. /// Warning: This method is optimized for speed. See [the struct](BulkLoader) documentation for more details.
/// ///
/// Usage example: /// Usage example:
/// ``` /// ```
@ -1354,7 +1387,7 @@ impl BulkLoader {
/// If the parsing fails in the middle of the file, only a part of it may be written to the store. /// If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// Results might get weird if you delete data during the loading process. /// Results might get weird if you delete data during the loading process.
/// ///
/// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. /// Warning: This method is optimized for speed. See [the struct](BulkLoader) documentation for more details.
/// ///
/// Usage example: /// Usage example:
/// ``` /// ```
@ -1400,7 +1433,7 @@ impl BulkLoader {
/// If the process fails in the middle of the file, only a part of the data may be written to the store. /// If the process fails in the middle of the file, only a part of the data may be written to the store.
/// Results might get weird if you delete data during the loading process. /// Results might get weird if you delete data during the loading process.
/// ///
/// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. /// Warning: This method is optimized for speed. See [the struct](BulkLoader) documentation for more details.
pub fn load_quads(&self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> { pub fn load_quads(&self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> {
self.storage self.storage
.load::<StorageError, _, _>(quads.into_iter().map(Ok)) .load::<StorageError, _, _>(quads.into_iter().map(Ok))

@ -38,9 +38,9 @@ const LOGO: &str = include_str!("../logo.svg");
#[derive(Parser)] #[derive(Parser)]
#[clap(about, version)] #[clap(about, version)]
/// Oxigraph SPARQL server /// Oxigraph SPARQL server.
struct Args { struct Args {
/// Directory in which persist the data /// Directory in which persist the data.
#[clap(short, long, parse(from_os_str), global = true)] #[clap(short, long, parse(from_os_str), global = true)]
location: Option<PathBuf>, location: Option<PathBuf>,
#[clap(subcommand)] #[clap(subcommand)]
@ -49,15 +49,17 @@ struct Args {
#[derive(Subcommand)] #[derive(Subcommand)]
enum Command { enum Command {
/// Start Oxigraph HTTP server /// Start Oxigraph HTTP server.
Serve { Serve {
/// Host and port to listen to /// Host and port to listen to.
#[clap(short, long, default_value = "localhost:7878", global = true)] #[clap(short, long, default_value = "localhost:7878", global = true)]
bind: String, bind: String,
}, },
/// Load file(s) into the store /// Load file(s) into the store.
Load { Load {
/// file(s) to load /// file(s) to load.
///
/// If multiple files are provided they are loaded in parallel.
#[clap(short, long, global = true)] #[clap(short, long, global = true)]
file: Vec<String>, file: Vec<String>,
}, },

Loading…
Cancel
Save