Bulk loader: use as much memory as possible

pull/192/head
Tpt 3 years ago
parent c5f12f10f6
commit d0b3d76bf1
  1. 26
      Cargo.lock
  2. 2
      lib/Cargo.toml
  3. 6
      lib/src/storage/backend/rocksdb.rs
  4. 24
      lib/src/storage/mod.rs

26
Cargo.lock generated

@ -710,6 +710,15 @@ dependencies = [
"version_check",
]
[[package]]
name = "ntapi"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28774a7fd2fbb4f0babd8237ce554b73af68021b5f695a3cebd6c59bac0980f"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "num-traits"
version = "0.2.14"
@ -792,7 +801,6 @@ dependencies = [
"libc",
"md-5",
"nom",
"num_cpus",
"oxhttp",
"oxilangtag",
"oxiri",
@ -808,6 +816,7 @@ dependencies = [
"siphasher",
"sparesults",
"spargebra",
"sysinfo",
"wasm-bindgen-test",
"zstd",
]
@ -1478,6 +1487,21 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "sysinfo"
version = "0.23.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07fa4c84a5305909b0eedfcc8d1f2fafdbede645bb700a45ecaafe681a0ac5d6"
dependencies = [
"cfg-if",
"core-foundation-sys",
"libc",
"ntapi",
"once_cell",
"rayon",
"winapi 0.3.9",
]
[[package]]
name = "term"
version = "0.2.14"

@ -36,7 +36,7 @@ hex = "0.4"
nom = "7"
siphasher = "0.3"
lazy_static = "1"
num_cpus = "1"
sysinfo = "0.23"
oxrdf = { version = "0.1.0-beta.4", path="oxrdf", features = ["rdf-star"] }
spargebra = { version = "0.2.0-beta.4", path="spargebra", features = ["rdf-star"] }
sparesults = { version = "0.1.0-beta.4", path="sparesults", features = ["rdf-star"] }

@ -25,6 +25,7 @@ use std::rc::{Rc, Weak};
use std::sync::Arc;
use std::thread::yield_now;
use std::{ptr, slice};
use sysinfo::{System, SystemExt};
macro_rules! ffi_result {
( $($function:ident)::*() ) => {
@ -68,6 +69,7 @@ lazy_static! {
UnsafeEnv(env)
}
};
static ref CPU_COUNT: Option<usize> = System::new().physical_core_count();
}
pub struct ColumnFamilyDefinition {
@ -166,7 +168,9 @@ impl Db {
rocksdb_options_set_create_if_missing(options, 1);
rocksdb_options_set_create_missing_column_families(options, 1);
rocksdb_options_optimize_level_style_compaction(options, 512 * 1024 * 1024);
rocksdb_options_increase_parallelism(options, num_cpus::get().try_into().unwrap());
if let Some(cpu_count) = *CPU_COUNT {
rocksdb_options_increase_parallelism(options, cpu_count.try_into().unwrap());
}
rocksdb_options_set_info_log_level(options, 2); // We only log warnings
rocksdb_options_set_max_log_file_size(options, 1024 * 1024); // Only 1MB log size
rocksdb_options_set_recycle_log_file_num(options, 10); // We do not keep more than 10 log files

@ -27,6 +27,7 @@ use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))]
use std::thread::spawn;
use std::thread::JoinHandle;
use sysinfo::{System, SystemExt};
mod backend;
mod binary_encoder;
@ -48,6 +49,7 @@ const GRAPHS_CF: &str = "graphs";
const DEFAULT_CF: &str = "default";
#[cfg(not(target_arch = "wasm32"))]
const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000;
const MAX_BULK_LOAD_BATCH_SIZE: usize = 100_000_000;
/// Low level storage primitives
#[derive(Clone)]
@ -1192,24 +1194,32 @@ impl StorageBulkLoader {
&self,
quads: I,
) -> Result<(), EO> {
let system = System::new_all();
let cpu_count = min(8, system.physical_core_count().unwrap_or(2));
let num_threads = max(
if let Some(num_threads) = self.num_threads {
num_threads
} else if let Some(max_memory_size) = self.max_memory_size {
min(
num_cpus::get(),
cpu_count,
max_memory_size * 1000 / DEFAULT_BULK_LOAD_BATCH_SIZE,
)
} else {
num_cpus::get()
cpu_count
},
2,
);
let batch_size = if let Some(max_memory_size) = self.max_memory_size {
max(1000, max_memory_size * 1000 / num_threads)
} else {
DEFAULT_BULK_LOAD_BATCH_SIZE
};
let batch_size = min(
if let Some(max_memory_size) = self.max_memory_size {
max(1000, max_memory_size * 1000 / num_threads)
} else {
max(
usize::try_from(system.free_memory()).unwrap() / num_threads,
DEFAULT_BULK_LOAD_BATCH_SIZE,
)
},
MAX_BULK_LOAD_BATCH_SIZE,
);
let mut threads = VecDeque::with_capacity(num_threads - 1);
let mut buffer = Vec::with_capacity(batch_size);
let done_counter = Arc::new(AtomicU64::new(0));

Loading…
Cancel
Save