Bulk loader: reduces default system parameters

Uses by default targets of 1GB of RAM and 2 threads.

Data parsing is in most of the case slower than ingestion so no more than 2 threads are used anyway.
pull/448/head
Tpt 2 years ago committed by Thomas Tanon
parent f29a49bcd2
commit 13976014e7
  1. 25
      Cargo.lock
  2. 1
      lib/Cargo.toml
  3. 43
      lib/src/storage/mod.rs
  4. 14
      lib/src/store.rs

25
Cargo.lock generated

@ -867,15 +867,6 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
[[package]]
name = "ntapi"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc51db7b362b205941f71232e56c625156eb9a929f8cf74a428fd5bc094a4afc"
dependencies = [
"winapi 0.3.9",
]
[[package]] [[package]]
name = "num-traits" name = "num-traits"
version = "0.2.15" version = "0.2.15"
@ -960,7 +951,6 @@ dependencies = [
"siphasher", "siphasher",
"sparesults", "sparesults",
"spargebra", "spargebra",
"sysinfo",
"zstd", "zstd",
] ]
@ -1676,21 +1666,6 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "sysinfo"
version = "0.28.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f69e0d827cce279e61c2f3399eb789271a8f136d8245edef70f06e3c9601a670"
dependencies = [
"cfg-if",
"core-foundation-sys",
"libc",
"ntapi",
"once_cell",
"rayon",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "target-lexicon" name = "target-lexicon"
version = "0.12.6" version = "0.12.6"

@ -36,7 +36,6 @@ rio_xml = "0.8"
hex = "0.4" hex = "0.4"
siphasher = "0.3" siphasher = "0.3"
lazy_static = "1" lazy_static = "1"
sysinfo = "0.28"
oxrdf = { version = "0.1.5", path="oxrdf", features = ["rdf-star", "oxsdatatypes"] } oxrdf = { version = "0.1.5", path="oxrdf", features = ["rdf-star", "oxsdatatypes"] }
oxsdatatypes = { version = "0.1.1", path="oxsdatatypes" } oxsdatatypes = { version = "0.1.1", path="oxsdatatypes" }
spargebra = { version = "0.2.7", path="spargebra", features = ["rdf-star", "sep-0002", "sep-0006"] } spargebra = { version = "0.2.7", path="spargebra", features = ["rdf-star", "sep-0002", "sep-0006"] }

@ -16,8 +16,6 @@ use crate::storage::numeric_encoder::Decoder;
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup}; use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
use std::cmp::{max, min};
#[cfg(not(target_family = "wasm"))]
use std::collections::VecDeque; use std::collections::VecDeque;
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
@ -34,8 +32,6 @@ use std::sync::Arc;
use std::thread::spawn; use std::thread::spawn;
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
use std::thread::JoinHandle; use std::thread::JoinHandle;
#[cfg(not(target_family = "wasm"))]
use sysinfo::{System, SystemExt};
mod backend; mod backend;
mod binary_encoder; mod binary_encoder;
@ -58,8 +54,6 @@ const GRAPHS_CF: &str = "graphs";
const DEFAULT_CF: &str = "default"; const DEFAULT_CF: &str = "default";
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000; const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000;
#[cfg(not(target_family = "wasm"))]
const MAX_BULK_LOAD_BATCH_SIZE: usize = 100_000_000;
/// Low level storage primitives /// Low level storage primitives
#[derive(Clone)] #[derive(Clone)]
@ -1241,32 +1235,23 @@ impl StorageBulkLoader {
&self, &self,
quads: impl IntoIterator<Item = Result<Quad, EI>>, quads: impl IntoIterator<Item = Result<Quad, EI>>,
) -> Result<(), EO> { ) -> Result<(), EO> {
let system = System::new_all(); let num_threads = self.num_threads.unwrap_or(2);
let cpu_count = min(4, system.physical_core_count().unwrap_or(2)); if num_threads < 2 {
let num_threads = max( return Err(
if let Some(num_threads) = self.num_threads { StorageError::Other("The bulk loader needs at least 2 threads".into()).into(),
num_threads
} else if let Some(max_memory_size) = self.max_memory_size {
min(
cpu_count,
max_memory_size * 1000 / DEFAULT_BULK_LOAD_BATCH_SIZE,
)
} else {
cpu_count
},
2,
); );
let batch_size = min( }
if let Some(max_memory_size) = self.max_memory_size { let batch_size = if let Some(max_memory_size) = self.max_memory_size {
max(1000, max_memory_size * 1000 / num_threads) max_memory_size * 1000 / num_threads
} else { } else {
max( DEFAULT_BULK_LOAD_BATCH_SIZE
usize::try_from(system.free_memory()).unwrap() / 1000 / num_threads, };
DEFAULT_BULK_LOAD_BATCH_SIZE, if batch_size < 10_000 {
return Err(StorageError::Other(
"The bulk loader memory bound is too low. It needs at least 100MB".into(),
) )
}, .into());
MAX_BULK_LOAD_BATCH_SIZE, }
);
let mut threads = VecDeque::with_capacity(num_threads - 1); let mut threads = VecDeque::with_capacity(num_threads - 1);
let mut buffer = Vec::with_capacity(batch_size); let mut buffer = Vec::with_capacity(batch_size);
let done_counter = Arc::new(AtomicU64::new(0)); let done_counter = Arc::new(AtomicU64::new(0));

@ -1328,7 +1328,7 @@ impl Iterator for GraphNameIter {
/// Memory usage is configurable using [`BulkLoader::set_max_memory_size_in_megabytes`] /// Memory usage is configurable using [`BulkLoader::set_max_memory_size_in_megabytes`]
/// and the number of used threads with [`BulkLoader::set_num_threads`]. /// and the number of used threads with [`BulkLoader::set_num_threads`].
/// By default the memory consumption target (excluding the system and RocksDB internal consumption) /// By default the memory consumption target (excluding the system and RocksDB internal consumption)
/// is 1GB per thread and the number of threads is set to the number of logical CPU cores provided by the system. /// is around 2GB per thread and 2 threads.
/// These targets are considered per loaded file. /// These targets are considered per loaded file.
/// ///
/// Usage example with loading a dataset: /// Usage example with loading a dataset:
@ -1360,23 +1360,21 @@ impl BulkLoader {
/// ///
/// This number must be at last 2 (one for parsing and one for loading). /// This number must be at last 2 (one for parsing and one for loading).
/// ///
/// By default this is the number of logical CPU cores provided by the system except if /// The default value is 2.
/// [`BulkLoader::set_max_memory_size_in_megabytes`] is set. In this case at least one 1GB is reserved
/// per used thread.
pub fn set_num_threads(mut self, num_threads: usize) -> Self { pub fn set_num_threads(mut self, num_threads: usize) -> Self {
self.storage = self.storage.set_num_threads(num_threads); self.storage = self.storage.set_num_threads(num_threads);
self self
} }
/// Sets the maximal number of memory used by this operation. /// Sets a rough idea of the maximal amount of memory to be used by this operation.
/// ///
/// This number must be at last a few megabytes per thread. /// This number must be at last a few megabytes per thread.
/// ///
/// Memory used by RocksDB and the system is not taken into account in this limit. /// Memory used by RocksDB and the system is not taken into account in this limit.
/// Note that depending on the system behavior this amount might never be reached. /// Note that depending on the system behavior this amount might never be reached or be blown up
/// (for example if the data contains very long IRIs or literals).
/// ///
/// By default, at most 1GB per used thread is used /// By default, a target 2GB per used thread is used.
/// (i.e. at most GBs at the number of available logical CPU cores in total).
pub fn set_max_memory_size_in_megabytes(mut self, max_memory_size: usize) -> Self { pub fn set_max_memory_size_in_megabytes(mut self, max_memory_size: usize) -> Self {
self.storage = self self.storage = self
.storage .storage

Loading…
Cancel
Save