From e99d6f4ad086e809219838161c9b0f8a2596b8e3 Mon Sep 17 00:00:00 2001 From: Tpt Date: Sun, 28 Nov 2021 18:44:08 +0100 Subject: [PATCH] Makes bulk loader multithreaded --- lib/src/storage/mod.rs | 45 ++++++++++++++++++++++++++++-------------- lib/src/store.rs | 9 +++++---- 2 files changed, 35 insertions(+), 19 deletions(-) diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index a1a591d2..c92141d8 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -18,6 +18,7 @@ use std::mem::take; #[cfg(not(target_arch = "wasm32"))] use std::path::Path; use std::path::PathBuf; +use std::thread::spawn; mod backend; mod binary_encoder; @@ -37,7 +38,7 @@ const DPOS_CF: &str = "dpos"; const DOSP_CF: &str = "dosp"; const GRAPHS_CF: &str = "graphs"; const DEFAULT_CF: &str = "default"; -const AUTO_WRITE_BATCH_THRESHOLD: usize = 1024 * 1024; +const BULK_LOAD_BATCH_SIZE: usize = 1024 * 1024; /// Low level storage primitives #[derive(Clone)] @@ -154,7 +155,7 @@ impl Storage { if !quad.graph_name.is_default_graph() { transaction.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name))?; size += 1; - if size % AUTO_WRITE_BATCH_THRESHOLD == 0 { + if size % BULK_LOAD_BATCH_SIZE == 0 { let mut tr = this.db.transaction(); swap(&mut transaction, &mut tr); tr.commit()?; @@ -955,8 +956,29 @@ impl StorageWriter { /// Creates a database from a dataset files. #[cfg(not(target_arch = "wasm32"))] -pub struct BulkLoader<'a> { - storage: &'a Storage, +pub fn bulk_load(storage: &Storage, quads: impl IntoIterator>) -> Result<()> { + let mut threads = Vec::new(); + let mut buffer = Vec::with_capacity(BULK_LOAD_BATCH_SIZE); + for quad in quads { + let quad = quad?; + buffer.push(quad); + if buffer.len() >= BULK_LOAD_BATCH_SIZE { + let buffer = take(&mut buffer); + let storage = storage.clone(); + threads.push(spawn(move || BulkLoader::new(storage).load(buffer))); + } + } + BulkLoader::new(storage.clone()).load(buffer)?; // Last buffer + for thread in threads { + thread.join().unwrap()?; + } + Ok(()) +} + +/// Creates a database from a dataset files. +#[cfg(not(target_arch = "wasm32"))] +struct BulkLoader { + storage: Storage, id2str: HashMap>, quads: HashSet, triples: HashSet, @@ -965,8 +987,8 @@ pub struct BulkLoader<'a> { } #[cfg(not(target_arch = "wasm32"))] -impl<'a> BulkLoader<'a> { - pub fn new(storage: &'a Storage) -> Self { +impl BulkLoader { + fn new(storage: Storage) -> Self { Self { storage, id2str: HashMap::default(), @@ -977,10 +999,8 @@ impl<'a> BulkLoader<'a> { } } - pub fn load(&mut self, quads: impl IntoIterator>) -> Result<()> { - let mut count = 0; + fn load(&mut self, quads: impl IntoIterator) -> Result<()> { for quad in quads { - let quad = quad?; let encoded = EncodedQuad::from(quad.as_ref()); self.buffer.clear(); if quad.graph_name.is_default_graph() { @@ -1011,13 +1031,8 @@ impl<'a> BulkLoader<'a> { } } } - count += 1; - if count % (1024 * 1024) == 0 { - self.save()?; - } } - self.save()?; - self.storage.compact() + self.save() } fn save(&mut self) -> Result<()> { diff --git a/lib/src/store.rs b/lib/src/store.rs index d78e5996..a52cae1b 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -30,10 +30,10 @@ use crate::sparql::{ evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update, UpdateOptions, }; +#[cfg(not(target_arch = "wasm32"))] +use crate::storage::bulk_load; use crate::storage::io::{dump_dataset, dump_graph, load_dataset, load_graph}; use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm}; -#[cfg(not(target_arch = "wasm32"))] -use crate::storage::BulkLoader; use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader}; use std::io::{BufRead, Write}; #[cfg(not(target_arch = "wasm32"))] @@ -618,7 +618,7 @@ impl Store { .with_base_iri(base_iri) .map_err(invalid_input_error)?; } - BulkLoader::new(&self.storage).load(parser.read_quads(reader)?) + bulk_load(&self.storage, parser.read_quads(reader)?) } /// Loads a dataset file efficiently into the store. @@ -662,7 +662,8 @@ impl Store { .map_err(invalid_input_error)?; } let to_graph_name = to_graph_name.into(); - BulkLoader::new(&self.storage).load( + bulk_load( + &self.storage, parser .read_triples(reader)? .map(|r| Ok(r?.in_graph(to_graph_name.into_owned()))),