Bulk loader: rewrite API and add hook to display progress indicators

pull/190/head
Tpt 3 years ago
parent 66cfb3ebf6
commit adea1899e5
  1. 3
      lib/benches/store.rs
  2. 2
      lib/spargebra/src/parser.rs
  3. 148
      lib/src/storage/mod.rs
  4. 247
      lib/src/store.rs
  5. 11
      lib/tests/store.rs
  6. 6
      python/src/store.rs
  7. 37
      server/src/main.rs

@ -75,7 +75,8 @@ fn do_load(store: &Store, data: &[u8]) {
fn do_bulk_load(store: &Store, data: &[u8]) { fn do_bulk_load(store: &Store, data: &[u8]) {
store store
.bulk_load_graph( .bulk_loader()
.load_graph(
Cursor::new(&data), Cursor::new(&data),
GraphFormat::NTriples, GraphFormat::NTriples,
GraphNameRef::DefaultGraph, GraphNameRef::DefaultGraph,

@ -1265,7 +1265,7 @@ parser! {
} / i:InsertClause() { } / i:InsertClause() {
(None, Some(i)) (None, Some(i))
} }
rule Modify_clear() -> () = { rule Modify_clear() = {
state.used_bnodes.clear(); state.used_bnodes.clear();
state.currently_used_bnodes.clear(); state.currently_used_bnodes.clear();
} }

@ -13,6 +13,7 @@ use crate::storage::numeric_encoder::{
insert_term, Decoder, EncodedQuad, EncodedTerm, StrHash, StrLookup, insert_term, Decoder, EncodedQuad, EncodedTerm, StrHash, StrLookup,
}; };
use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
use std::collections::VecDeque;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::error::Error; use std::error::Error;
@ -20,8 +21,11 @@ use std::error::Error;
use std::mem::take; use std::mem::take;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::thread::spawn; use std::thread::spawn;
use std::thread::JoinHandle;
mod backend; mod backend;
mod binary_encoder; mod binary_encoder;
@ -42,7 +46,7 @@ const DOSP_CF: &str = "dosp";
const GRAPHS_CF: &str = "graphs"; const GRAPHS_CF: &str = "graphs";
const DEFAULT_CF: &str = "default"; const DEFAULT_CF: &str = "default";
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
const BULK_LOAD_BATCH_SIZE: usize = 1024 * 1024; const BULK_LOAD_BATCH_SIZE: usize = 1_000_000;
/// Low level storage primitives /// Low level storage primitives
#[derive(Clone)] #[derive(Clone)]
@ -1149,47 +1153,109 @@ impl<'a> StorageWriter<'a> {
} }
} }
/// Creates a database from a dataset files.
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn bulk_load< pub struct StorageBulkLoader {
EI, storage: Storage,
EO: From<StorageError> + From<EI>, hooks: Vec<Box<dyn Fn(u64)>>,
I: IntoIterator<Item = Result<Quad, EI>>, num_threads: usize,
>( }
storage: &Storage,
quads: I, #[cfg(not(target_arch = "wasm32"))]
) -> Result<(), EO> { impl StorageBulkLoader {
let mut threads = Vec::new(); pub fn new(storage: Storage) -> Self {
let mut buffer = Vec::with_capacity(BULK_LOAD_BATCH_SIZE); Self {
for quad in quads { storage,
let quad = quad?; hooks: Vec::new(),
buffer.push(quad); num_threads: num_cpus::get() * 4,
if buffer.len() >= BULK_LOAD_BATCH_SIZE { }
let buffer = take(&mut buffer); }
let storage = storage.clone();
threads.push(spawn(move || BulkLoader::new(storage).load(buffer))); pub fn on_progress(mut self, callback: impl Fn(u64) + 'static) -> Self {
self.hooks.push(Box::new(callback));
self
}
pub fn load<EI, EO: From<StorageError> + From<EI>, I: IntoIterator<Item = Result<Quad, EI>>>(
&self,
quads: I,
) -> Result<(), EO> {
let mut threads = VecDeque::with_capacity(self.num_threads);
let mut buffer = Vec::with_capacity(BULK_LOAD_BATCH_SIZE);
let done_counter = Arc::new(AtomicU64::new(0));
let mut done_and_displayed_counter = 0;
for quad in quads {
let quad = quad?;
buffer.push(quad);
if buffer.len() >= BULK_LOAD_BATCH_SIZE {
self.spawn_load_thread(
&mut buffer,
&mut threads,
&done_counter,
&mut done_and_displayed_counter,
)?;
}
}
self.spawn_load_thread(
&mut buffer,
&mut threads,
&done_counter,
&mut done_and_displayed_counter,
)?;
for thread in threads {
thread.join().unwrap()?;
self.on_possible_progress(&done_counter, &mut done_and_displayed_counter);
}
Ok(())
}
fn spawn_load_thread(
&self,
buffer: &mut Vec<Quad>,
threads: &mut VecDeque<JoinHandle<Result<(), StorageError>>>,
done_counter: &Arc<AtomicU64>,
done_and_displayed_counter: &mut u64,
) -> Result<(), StorageError> {
self.on_possible_progress(done_counter, done_and_displayed_counter);
// We avoid to have too many threads
if threads.len() >= self.num_threads {
if let Some(thread) = threads.pop_front() {
thread.join().unwrap()?;
self.on_possible_progress(done_counter, done_and_displayed_counter);
}
} }
let buffer = take(buffer);
let storage = self.storage.clone();
let done_counter_clone = done_counter.clone();
threads.push_back(spawn(move || {
FileBulkLoader::new(storage).load(buffer, &done_counter_clone)
}));
self.on_possible_progress(done_counter, done_and_displayed_counter);
Ok(())
} }
BulkLoader::new(storage.clone()).load(buffer)?; // Last buffer
for thread in threads { fn on_possible_progress(&self, done: &AtomicU64, done_and_displayed: &mut u64) {
thread.join().unwrap()?; let new_counter = done.fetch_max(*done_and_displayed, Ordering::Relaxed);
let display_step = u64::try_from(BULK_LOAD_BATCH_SIZE).unwrap();
if new_counter % display_step > *done_and_displayed % display_step {
for hook in &self.hooks {
hook(new_counter);
}
}
*done_and_displayed = new_counter;
} }
Ok(())
} }
/// Creates a database from a dataset files.
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
struct BulkLoader { struct FileBulkLoader {
storage: Storage, storage: Storage,
id2str: HashMap<StrHash, Box<str>>, id2str: HashMap<StrHash, Box<str>>,
quads: HashSet<EncodedQuad>, quads: HashSet<EncodedQuad>,
triples: HashSet<EncodedQuad>, triples: HashSet<EncodedQuad>,
graphs: HashSet<EncodedTerm>, graphs: HashSet<EncodedTerm>,
buffer: Vec<u8>,
} }
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
impl BulkLoader { impl FileBulkLoader {
fn new(storage: Storage) -> Self { fn new(storage: Storage) -> Self {
Self { Self {
storage, storage,
@ -1197,30 +1263,42 @@ impl BulkLoader {
quads: HashSet::default(), quads: HashSet::default(),
triples: HashSet::default(), triples: HashSet::default(),
graphs: HashSet::default(), graphs: HashSet::default(),
buffer: Vec::new(),
} }
} }
fn load(&mut self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> { fn load(
&mut self,
quads: impl IntoIterator<Item = Quad>,
counter: &AtomicU64,
) -> Result<(), StorageError> {
self.encode(quads)?;
let size = self.triples.len() + self.quads.len();
self.save()?;
counter.fetch_add(size.try_into().unwrap(), Ordering::Relaxed);
Ok(())
}
fn encode(&mut self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> {
let mut buffer = Vec::new();
for quad in quads { for quad in quads {
let encoded = EncodedQuad::from(quad.as_ref()); let encoded = EncodedQuad::from(quad.as_ref());
self.buffer.clear(); buffer.clear();
if quad.graph_name.is_default_graph() { if quad.graph_name.is_default_graph() {
write_spo_quad(&mut self.buffer, &encoded); write_spo_quad(&mut buffer, &encoded);
if self.triples.insert(encoded.clone()) { if self.triples.insert(encoded.clone()) {
self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?; self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?;
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?; self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?;
self.insert_term(quad.object.as_ref(), &encoded.object)?; self.insert_term(quad.object.as_ref(), &encoded.object)?;
} }
} else { } else {
write_spog_quad(&mut self.buffer, &encoded); write_spog_quad(&mut buffer, &encoded);
if self.quads.insert(encoded.clone()) { if self.quads.insert(encoded.clone()) {
self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?; self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?;
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?; self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?;
self.insert_term(quad.object.as_ref(), &encoded.object)?; self.insert_term(quad.object.as_ref(), &encoded.object)?;
self.buffer.clear(); buffer.clear();
write_term(&mut self.buffer, &encoded.graph_name); write_term(&mut buffer, &encoded.graph_name);
if self.graphs.insert(encoded.graph_name.clone()) { if self.graphs.insert(encoded.graph_name.clone()) {
self.insert_term( self.insert_term(
match quad.graph_name.as_ref() { match quad.graph_name.as_ref() {
@ -1234,7 +1312,7 @@ impl BulkLoader {
} }
} }
} }
self.save() Ok(())
} }
fn save(&mut self) -> Result<(), StorageError> { fn save(&mut self) -> Result<(), StorageError> {

@ -32,9 +32,9 @@ use crate::sparql::{
evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update, evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update,
UpdateOptions, UpdateOptions,
}; };
#[cfg(not(target_arch = "wasm32"))]
use crate::storage::bulk_load;
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm}; use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
#[cfg(not(target_arch = "wasm32"))]
use crate::storage::StorageBulkLoader;
use crate::storage::{ use crate::storage::{
ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader, StorageWriter, ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader, StorageWriter,
}; };
@ -365,7 +365,7 @@ impl Store {
/// Loads a graph file (i.e. triples) into the store. /// Loads a graph file (i.e. triples) into the store.
/// ///
/// This function is atomic, quite slow and memory hungry. To get much better performances you might want to use [`bulk_load_graph`](Store::bulk_load_graph). /// This function is atomic, quite slow and memory hungry. To get much better performances you might want to use the [`bulk_loader`](Store::bulk_loader).
/// ///
/// Usage example: /// Usage example:
/// ``` /// ```
@ -411,7 +411,7 @@ impl Store {
/// Loads a dataset file (i.e. quads) into the store. /// Loads a dataset file (i.e. quads) into the store.
/// ///
/// This function is atomic, quite slow and memory hungry. To get much better performances you might want to [`bulk_load_dataset`](Store::bulk_load_dataset). /// This function is atomic, quite slow and memory hungry. To get much better performances you might want to use the [`bulk_loader`](Store::bulk_loader).
/// ///
/// Usage example: /// Usage example:
/// ``` /// ```
@ -477,7 +477,7 @@ impl Store {
/// Adds atomically a set of quads to this store. /// Adds atomically a set of quads to this store.
/// ///
/// Warning: This operation uses a memory heavy transaction internally, use [`bulk_extend`](Store::bulk_extend) if you plan to add ten of millions of triples. /// Warning: This operation uses a memory heavy transaction internally, use the [`bulk_loader`](Store::bulk_loader) if you plan to add ten of millions of triples.
pub fn extend( pub fn extend(
&self, &self,
quads: impl IntoIterator<Item = impl Into<Quad>>, quads: impl IntoIterator<Item = impl Into<Quad>>,
@ -753,15 +753,7 @@ impl Store {
self.storage.backup(target_directory.as_ref()) self.storage.backup(target_directory.as_ref())
} }
/// Loads a dataset file efficiently into the store. /// Creates a bulk loader allowing to load at lot of data quickly into the store.
///
/// This function is optimized for large dataset loading speed. For small files, [`load_dataset`](Store::load_dataset) might be more convenient.
///
/// Warning: This method is not atomic.
/// If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// Results might get weird if you delete data during the loading process.
///
/// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files.
/// ///
/// Usage example: /// Usage example:
/// ``` /// ```
@ -771,9 +763,9 @@ impl Store {
/// ///
/// let store = Store::new()?; /// let store = Store::new()?;
/// ///
/// // insertion /// // quads file insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> ."; /// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
/// store.bulk_load_dataset(file.as_ref(), DatasetFormat::NQuads, None)?; /// store.bulk_loader().load_dataset(file.as_ref(), DatasetFormat::NQuads, None)?;
/// ///
/// // we inspect the store contents /// // we inspect the store contents
/// let ex = NamedNodeRef::new("http://example.com")?; /// let ex = NamedNodeRef::new("http://example.com")?;
@ -781,81 +773,10 @@ impl Store {
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn bulk_load_dataset( pub fn bulk_loader(&self) -> BulkLoader {
&self, BulkLoader {
reader: impl BufRead, storage: StorageBulkLoader::new(self.storage.clone()),
format: DatasetFormat,
base_iri: Option<&str>,
) -> Result<(), LoaderError> {
let mut parser = DatasetParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
} }
bulk_load(&self.storage, parser.read_quads(reader)?)
}
/// Loads a dataset file efficiently into the store.
///
/// This function is optimized for large dataset loading speed. For small files, [`load_graph`](Store::load_graph) might be more convenient.
///
/// Warning: This method is not atomic.
/// If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// Results might get weird if you delete data during the loading process.
///
/// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files.
///
/// Usage example:
/// ```
/// use oxigraph::store::Store;
/// use oxigraph::io::GraphFormat;
/// use oxigraph::model::*;
///
/// let store = Store::new()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> .";
/// store.bulk_load_graph(file.as_ref(), GraphFormat::NTriples, GraphNameRef::DefaultGraph, None)?;
///
/// // we inspect the store contents
/// let ex = NamedNodeRef::new("http://example.com")?;
/// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ```
#[cfg(not(target_arch = "wasm32"))]
pub fn bulk_load_graph<'a>(
&self,
reader: impl BufRead,
format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>,
) -> Result<(), LoaderError> {
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
}
let to_graph_name = to_graph_name.into();
bulk_load(
&self.storage,
parser
.read_triples(reader)?
.map(|r| r.map(|q| q.in_graph(to_graph_name.into_owned()))),
)
}
/// Adds a set of triples to this store using bulk load.
///
/// Warning: This method is not atomic.
/// If the process fails in the middle of the file, only a part of the data may be written to the store.
/// Results might get weird if you delete data during the loading process.
///
/// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files.
#[cfg(not(target_arch = "wasm32"))]
pub fn bulk_extend(&self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> {
bulk_load::<StorageError, _, _>(&self.storage, quads.into_iter().map(Ok))
} }
/// Validates that all the store invariants held in the data /// Validates that all the store invariants held in the data
@ -1155,9 +1076,7 @@ impl<'a> Transaction<'a> {
self.writer.insert(quad.into()) self.writer.insert(quad.into())
} }
/// Adds atomically a set of quads to this store. /// Adds a set of quads to this store.
///
/// Warning: This operation uses a memory heavy transaction internally, use [`bulk_extend`](Store::bulk_extend) if you plan to add ten of millions of triples.
pub fn extend<'b>( pub fn extend<'b>(
&mut self, &mut self,
quads: impl IntoIterator<Item = impl Into<QuadRef<'b>>>, quads: impl IntoIterator<Item = impl Into<QuadRef<'b>>>,
@ -1346,6 +1265,148 @@ impl Iterator for GraphNameIter {
} }
} }
/// A bulk loader allowing to load at lot of data quickly into the store.
///
/// Warning: The operations provided here are not atomic.
/// If the operation fails in the middle, only a part of the data may be written to the store.
/// Results might get weird if you delete data during the loading process.
///
/// Warning: It is optimized for speed. It uses multiple threads and GBs of RAM on large files.
///
/// Usage example with loading a dataset:
/// ```
/// use oxigraph::store::Store;
/// use oxigraph::io::DatasetFormat;
/// use oxigraph::model::*;
///
/// let store = Store::new()?;
///
/// // quads file insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
/// store.bulk_loader().load_dataset(file.as_ref(), DatasetFormat::NQuads, None)?;
///
/// // we inspect the store contents
/// let ex = NamedNodeRef::new("http://example.com")?;
/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ```
#[cfg(not(target_arch = "wasm32"))]
pub struct BulkLoader {
storage: StorageBulkLoader,
}
#[cfg(not(target_arch = "wasm32"))]
impl BulkLoader {
/// Adds a `callback` evaluated from time to time with the number of loaded triples.
pub fn on_progress(self, callback: impl Fn(u64) + 'static) -> Self {
Self {
storage: self.storage.on_progress(callback),
}
}
/// Loads a dataset file using the bulk loader.
///
/// This function is optimized for large dataset loading speed. For small files, [`Store::load_dataset`] might be more convenient.
///
/// Warning: This method is not atomic.
/// If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// Results might get weird if you delete data during the loading process.
///
/// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files.
///
/// Usage example:
/// ```
/// use oxigraph::store::Store;
/// use oxigraph::io::DatasetFormat;
/// use oxigraph::model::*;
///
/// let store = Store::new()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
/// store.bulk_loader().load_dataset(file.as_ref(), DatasetFormat::NQuads, None)?;
///
/// // we inspect the store contents
/// let ex = NamedNodeRef::new("http://example.com")?;
/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn load_dataset(
&self,
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> Result<(), LoaderError> {
let mut parser = DatasetParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
}
self.storage.load(parser.read_quads(reader)?)
}
/// Loads a graph file using the bulk loader.
///
/// This function is optimized for large graph loading speed. For small files, [`Store::load_graph`] might be more convenient.
///
/// Warning: This method is not atomic.
/// If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// Results might get weird if you delete data during the loading process.
///
/// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files.
///
/// Usage example:
/// ```
/// use oxigraph::store::Store;
/// use oxigraph::io::GraphFormat;
/// use oxigraph::model::*;
///
/// let store = Store::new()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> .";
/// store.bulk_loader().load_graph(file.as_ref(), GraphFormat::NTriples, GraphNameRef::DefaultGraph, None)?;
///
/// // we inspect the store contents
/// let ex = NamedNodeRef::new("http://example.com")?;
/// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn load_graph<'a>(
&self,
reader: impl BufRead,
format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>,
) -> Result<(), LoaderError> {
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| ParseError::invalid_base_iri(base_iri, e))?;
}
let to_graph_name = to_graph_name.into();
self.storage.load(
parser
.read_triples(reader)?
.map(|r| r.map(|q| q.in_graph(to_graph_name.into_owned()))),
)
}
/// Adds a set of quads using the bulk loader.
///
/// Warning: This method is not atomic.
/// If the process fails in the middle of the file, only a part of the data may be written to the store.
/// Results might get weird if you delete data during the loading process.
///
/// Warning: This method is optimized for speed. It uses multiple threads and GBs of RAM on large files.
pub fn load_quads(&self, quads: impl IntoIterator<Item = Quad>) -> Result<(), StorageError> {
self.storage
.load::<StorageError, _, _>(quads.into_iter().map(Ok))
}
}
#[test] #[test]
fn store() -> Result<(), StorageError> { fn store() -> Result<(), StorageError> {
use crate::model::*; use crate::model::*;

@ -113,7 +113,7 @@ fn test_load_graph() -> Result<(), Box<dyn Error>> {
#[test] #[test]
fn test_bulk_load_graph() -> Result<(), Box<dyn Error>> { fn test_bulk_load_graph() -> Result<(), Box<dyn Error>> {
let store = Store::new()?; let store = Store::new()?;
store.bulk_load_graph( store.bulk_loader().load_graph(
Cursor::new(DATA), Cursor::new(DATA),
GraphFormat::Turtle, GraphFormat::Turtle,
GraphNameRef::DefaultGraph, GraphNameRef::DefaultGraph,
@ -142,7 +142,9 @@ fn test_load_dataset() -> Result<(), Box<dyn Error>> {
#[test] #[test]
fn test_bulk_load_dataset() -> Result<(), Box<dyn Error>> { fn test_bulk_load_dataset() -> Result<(), Box<dyn Error>> {
let store = Store::new().unwrap(); let store = Store::new().unwrap();
store.bulk_load_dataset(Cursor::new(GRAPH_DATA), DatasetFormat::TriG, None)?; store
.bulk_loader()
.load_dataset(Cursor::new(GRAPH_DATA), DatasetFormat::TriG, None)?;
for q in quads(NamedNodeRef::new_unchecked( for q in quads(NamedNodeRef::new_unchecked(
"http://www.wikidata.org/wiki/Special:EntityData/Q90", "http://www.wikidata.org/wiki/Special:EntityData/Q90",
)) { )) {
@ -233,7 +235,7 @@ fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box<dy
); );
let store = Store::new()?; let store = Store::new()?;
store.remove(quad)?; store.remove(quad)?;
store.bulk_extend([quad.into_owned()])?; store.bulk_loader().load_quads([quad.into_owned()])?;
assert_eq!(store.len()?, 1); assert_eq!(store.len()?, 1);
Ok(()) Ok(())
} }
@ -256,7 +258,8 @@ fn test_bad_stt_open() -> Result<(), Box<dyn Error>> {
let store = Store::open(&dir.0)?; let store = Store::open(&dir.0)?;
remove_dir_all(&dir.0)?; remove_dir_all(&dir.0)?;
assert!(store assert!(store
.bulk_extend(once(Quad { .bulk_loader()
.load_quads(once(Quad {
subject: NamedNode::new_unchecked("http://example.com/s").into(), subject: NamedNode::new_unchecked("http://example.com/s").into(),
predicate: NamedNode::new_unchecked("http://example.com/p"), predicate: NamedNode::new_unchecked("http://example.com/p"),
object: NamedNode::new_unchecked("http://example.com/o").into(), object: NamedNode::new_unchecked("http://example.com/o").into(),

@ -376,7 +376,8 @@ impl PyStore {
py.allow_threads(|| { py.allow_threads(|| {
if let Some(graph_format) = GraphFormat::from_media_type(mime_type) { if let Some(graph_format) = GraphFormat::from_media_type(mime_type) {
self.inner self.inner
.bulk_load_graph( .bulk_loader()
.load_graph(
input, input,
graph_format, graph_format,
&to_graph_name.unwrap_or(GraphName::DefaultGraph), &to_graph_name.unwrap_or(GraphName::DefaultGraph),
@ -390,7 +391,8 @@ impl PyStore {
)); ));
} }
self.inner self.inner
.bulk_load_dataset(input, dataset_format, base_iri) .bulk_loader()
.load_dataset(input, dataset_format, base_iri)
.map_err(map_loader_error) .map_err(map_loader_error)
} else { } else {
Err(PyValueError::new_err(format!( Err(PyValueError::new_err(format!(

@ -28,7 +28,7 @@ use std::path::PathBuf;
use std::rc::Rc; use std::rc::Rc;
use std::str::FromStr; use std::str::FromStr;
use std::thread::{spawn, JoinHandle}; use std::thread::{spawn, JoinHandle};
use std::time::Duration; use std::time::{Duration, Instant};
use url::form_urlencoded; use url::form_urlencoded;
const MAX_SPARQL_BODY_SIZE: u64 = 1_048_576; const MAX_SPARQL_BODY_SIZE: u64 = 1_048_576;
@ -78,20 +78,29 @@ pub fn main() -> std::io::Result<()> {
let store = store.clone(); let store = store.clone();
let file = file.to_string(); let file = file.to_string();
spawn(move || { spawn(move || {
let format = file let f = file.clone();
let start = Instant::now();
let loader = store.bulk_loader().on_progress(move |size| {
let elapsed = start.elapsed();
println!("{} triples loaded in {}s ({} t/s) from {}", size, elapsed.as_secs(), size / elapsed.as_secs(), f)
});
let reader = BufReader::new(File::open(&file)?);
if let Some(format) = file
.rsplit_once('.') .rsplit_once('.')
.and_then(|(_, extension)| { .and_then(|(_, extension)| DatasetFormat::from_extension(extension)) {
DatasetFormat::from_extension(extension) loader.load_dataset(reader, format, None)?;
.or_else(|| GraphFormat::from_extension(extension)?.try_into().ok()) Ok(())
}) } else if let Some(format) = file
.ok_or_else(|| { .rsplit_once('.')
io::Error::new( .and_then(|(_, extension)| GraphFormat::from_extension(extension)) {
ErrorKind::InvalidInput, loader.load_graph(reader, format, GraphNameRef::DefaultGraph, None)?;
"The server is not able to guess the file format of {} from its extension", Ok(())
) } else {
})?; Err(io::Error::new(
store.bulk_load_dataset(BufReader::new(File::open(file)?), format, None)?; ErrorKind::InvalidInput,
Ok(()) "The server is not able to guess the file format of {} from its extension",
))
}
}) })
}).collect::<Vec<JoinHandle<io::Result<()>>>>(); }).collect::<Vec<JoinHandle<io::Result<()>>>>();
for handle in handles { for handle in handles {

Loading…
Cancel
Save