From 9e969cd8ebc7976825b4d48d79cd61fa3676755f Mon Sep 17 00:00:00 2001 From: Tpt Date: Sat, 12 Feb 2022 11:59:47 +0100 Subject: [PATCH] Adds methods to configure bulk loader system consumption --- lib/src/storage/mod.rs | 50 +++++++++++++++++++++++++++++++++++------- lib/src/store.rs | 41 ++++++++++++++++++++++++++++++---- server/src/main.rs | 14 +++++++----- 3 files changed, 87 insertions(+), 18 deletions(-) diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 9a775c25..0de9658f 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -13,6 +13,7 @@ use crate::storage::numeric_encoder::{ insert_term, Decoder, EncodedQuad, EncodedTerm, StrHash, StrLookup, }; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; +use std::cmp::{max, min}; use std::collections::VecDeque; #[cfg(not(target_arch = "wasm32"))] use std::collections::{HashMap, HashSet}; @@ -46,7 +47,7 @@ const DOSP_CF: &str = "dosp"; const GRAPHS_CF: &str = "graphs"; const DEFAULT_CF: &str = "default"; #[cfg(not(target_arch = "wasm32"))] -const BULK_LOAD_BATCH_SIZE: usize = 1_000_000; +const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000; /// Low level storage primitives #[derive(Clone)] @@ -1157,7 +1158,8 @@ impl<'a> StorageWriter<'a> { pub struct StorageBulkLoader { storage: Storage, hooks: Vec>, - num_threads: usize, + num_threads: Option, + max_memory_size: Option, } #[cfg(not(target_arch = "wasm32"))] @@ -1166,10 +1168,21 @@ impl StorageBulkLoader { Self { storage, hooks: Vec::new(), - num_threads: num_cpus::get() * 4, + num_threads: None, + max_memory_size: None, } } + pub fn set_num_threads(mut self, num_threads: usize) -> Self { + self.num_threads = Some(num_threads); + self + } + + pub fn set_max_memory_size_in_bytes(mut self, max_memory_size: usize) -> Self { + self.max_memory_size = Some(max_memory_size); + self + } + pub fn on_progress(mut self, callback: impl Fn(u64) + 'static) -> Self { self.hooks.push(Box::new(callback)); self @@ -1179,19 +1192,38 @@ impl StorageBulkLoader { &self, quads: I, ) -> Result<(), EO> { - let mut threads = VecDeque::with_capacity(self.num_threads); - let mut buffer = Vec::with_capacity(BULK_LOAD_BATCH_SIZE); + let num_threads = max( + if let Some(num_threads) = self.num_threads { + num_threads + } else if let Some(max_memory_size) = self.max_memory_size { + min( + num_cpus::get(), + max_memory_size / 1000 / DEFAULT_BULK_LOAD_BATCH_SIZE, + ) + } else { + num_cpus::get() + }, + 2, + ); + let batch_size = if let Some(max_memory_size) = self.max_memory_size { + max(1000, max_memory_size / 1000 / num_threads) + } else { + DEFAULT_BULK_LOAD_BATCH_SIZE + }; + let mut threads = VecDeque::with_capacity(num_threads - 1); + let mut buffer = Vec::with_capacity(batch_size); let done_counter = Arc::new(AtomicU64::new(0)); let mut done_and_displayed_counter = 0; for quad in quads { let quad = quad?; buffer.push(quad); - if buffer.len() >= BULK_LOAD_BATCH_SIZE { + if buffer.len() >= batch_size { self.spawn_load_thread( &mut buffer, &mut threads, &done_counter, &mut done_and_displayed_counter, + num_threads, )?; } } @@ -1200,6 +1232,7 @@ impl StorageBulkLoader { &mut threads, &done_counter, &mut done_and_displayed_counter, + num_threads, )?; for thread in threads { thread.join().unwrap()?; @@ -1214,10 +1247,11 @@ impl StorageBulkLoader { threads: &mut VecDeque>>, done_counter: &Arc, done_and_displayed_counter: &mut u64, + num_threads: usize, ) -> Result<(), StorageError> { self.on_possible_progress(done_counter, done_and_displayed_counter); // We avoid to have too many threads - if threads.len() >= self.num_threads { + if threads.len() >= num_threads { if let Some(thread) = threads.pop_front() { thread.join().unwrap()?; self.on_possible_progress(done_counter, done_and_displayed_counter); @@ -1235,7 +1269,7 @@ impl StorageBulkLoader { fn on_possible_progress(&self, done: &AtomicU64, done_and_displayed: &mut u64) { let new_counter = done.fetch_max(*done_and_displayed, Ordering::Relaxed); - let display_step = u64::try_from(BULK_LOAD_BATCH_SIZE).unwrap(); + let display_step = u64::try_from(DEFAULT_BULK_LOAD_BATCH_SIZE).unwrap(); if new_counter % display_step > *done_and_displayed % display_step { for hook in &self.hooks { hook(new_counter); diff --git a/lib/src/store.rs b/lib/src/store.rs index 879fd79c..ec374535 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -1271,7 +1271,12 @@ impl Iterator for GraphNameIter { /// If the operation fails in the middle, only a part of the data may be written to the store. /// Results might get weird if you delete data during the loading process. /// -/// Warning: It is optimized for speed. It uses multiple threads and GBs of RAM on large files. +/// Warning: It is optimized for speed. +/// Memory usage is configurable using [`BulkLoader::set_max_memory_size_in_bytes`] +/// and the number of used threads with [`BulkLoader::set_num_threads`]. +/// By default the memory consumption target (excluding the system and RocksDB internal consumption) +/// is 1GB per thread and the number of threads is set to the number of logical CPU cores provided by the system. +/// These targets are considered per loaded file. /// /// Usage example with loading a dataset: /// ``` @@ -1297,6 +1302,34 @@ pub struct BulkLoader { #[cfg(not(target_arch = "wasm32"))] impl BulkLoader { + /// Sets the maximal number of threads to be used by the bulk loader per operation. + /// + /// This number must be at last 2 (one for parsing and one for loading). + /// + /// By default this is the number of logical CPU cores provided by the system except if + /// [`BulkLoader::set_max_memory_size_in_bytes`] is set. In this case at least one 1GB is reserved + /// per used thread. + pub fn set_num_threads(self, num_threads: usize) -> Self { + Self { + storage: self.storage.set_num_threads(num_threads), + } + } + + /// Sets the maximal number of memory used by this operation. + /// + /// This number must be at last a few megabytes per thread. + /// + /// Memory used by RocksDB and the system is not taken into account in this limit. + /// Note that depending on the system behavior this amount might never be reached. + /// + /// By default, at most 1GB per used thread is used + /// (i.e. at most GBs at the number of available logical CPU cores in total). + pub fn set_max_memory_size_in_bytes(self, max_memory_size: usize) -> Self { + Self { + storage: self.storage.set_max_memory_size_in_bytes(max_memory_size), + } + } + /// Adds a `callback` evaluated from time to time with the number of loaded triples. pub fn on_progress(self, callback: impl Fn(u64) + 'static) -> Self { Self { @@ -1312,7 +1345,7 @@ impl BulkLoader { /// If the parsing fails in the middle of the file, only a part of it may be written to the store. /// Results might get weird if you delete data during the loading process. /// - /// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. + /// Warning: This method is optimized for speed. See [the struct](BulkLoader) documentation for more details. /// /// Usage example: /// ``` @@ -1354,7 +1387,7 @@ impl BulkLoader { /// If the parsing fails in the middle of the file, only a part of it may be written to the store. /// Results might get weird if you delete data during the loading process. /// - /// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. + /// Warning: This method is optimized for speed. See [the struct](BulkLoader) documentation for more details. /// /// Usage example: /// ``` @@ -1400,7 +1433,7 @@ impl BulkLoader { /// If the process fails in the middle of the file, only a part of the data may be written to the store. /// Results might get weird if you delete data during the loading process. /// - /// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files. + /// Warning: This method is optimized for speed. See [the struct](BulkLoader) documentation for more details. pub fn load_quads(&self, quads: impl IntoIterator) -> Result<(), StorageError> { self.storage .load::(quads.into_iter().map(Ok)) diff --git a/server/src/main.rs b/server/src/main.rs index 408377b3..bb01e144 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -38,9 +38,9 @@ const LOGO: &str = include_str!("../logo.svg"); #[derive(Parser)] #[clap(about, version)] -/// Oxigraph SPARQL server +/// Oxigraph SPARQL server. struct Args { - /// Directory in which persist the data + /// Directory in which persist the data. #[clap(short, long, parse(from_os_str), global = true)] location: Option, #[clap(subcommand)] @@ -49,15 +49,17 @@ struct Args { #[derive(Subcommand)] enum Command { - /// Start Oxigraph HTTP server + /// Start Oxigraph HTTP server. Serve { - /// Host and port to listen to + /// Host and port to listen to. #[clap(short, long, default_value = "localhost:7878", global = true)] bind: String, }, - /// Load file(s) into the store + /// Load file(s) into the store. Load { - /// file(s) to load + /// file(s) to load. + /// + /// If multiple files are provided they are loaded in parallel. #[clap(short, long, global = true)] file: Vec, },