From a3e40556dace57175f8b1eda671a45ae2adef4c4 Mon Sep 17 00:00:00 2001 From: Tpt Date: Tue, 30 Nov 2021 20:26:24 +0100 Subject: [PATCH] Automatically retries transactions on failure --- bench/bsbm_oxigraph.sh | 4 +- lib/src/sparql/error.rs | 5 +- lib/src/sparql/mod.rs | 10 +-- lib/src/sparql/update.rs | 90 +++++++++++--------- lib/src/storage/backend/rocksdb.rs | 35 ++++++-- lib/src/storage/io.rs | 70 ---------------- lib/src/storage/mod.rs | 90 +++++++------------- lib/src/store.rs | 127 +++++++++++++++++++---------- 8 files changed, 199 insertions(+), 232 deletions(-) delete mode 100644 lib/src/storage/io.rs diff --git a/bench/bsbm_oxigraph.sh b/bench/bsbm_oxigraph.sh index 87ad1ce2..b8c96e5e 100755 --- a/bench/bsbm_oxigraph.sh +++ b/bench/bsbm_oxigraph.sh @@ -7,7 +7,7 @@ cd bsbm-tools cargo build --release --manifest-path="../../server/Cargo.toml" ./../../target/release/oxigraph_server --location oxigraph_data load --file "explore-${DATASET_SIZE}.nt" ./../../target/release/oxigraph_server --location oxigraph_data serve --bind 127.0.0.1:7878 & -sleep 5 +sleep 1 curl -f -X POST -H 'Content-Type:application/n-triples' --data-binary "@explore-${DATASET_SIZE}.nt" http://127.0.0.1:7878/store?default ./testdriver -mt ${PARALLELISM} -ucf usecases/explore/sparql.txt -o "../bsbm.explore.oxigraph.${DATASET_SIZE}.${PARALLELISM}.main.xml" http://127.0.0.1:7878/query ./testdriver -mt ${PARALLELISM} -ucf usecases/exploreAndUpdate/sparql.txt -o "../bsbm.exploreAndUpdate.oxigraph.${DATASET_SIZE}.${PARALLELISM}.main.xml" http://127.0.0.1:7878/query -u http://127.0.0.1:7878/update -udataset "explore-update-${DATASET_SIZE}.nt" @@ -16,4 +16,4 @@ kill $! rm -r oxigraph_data rm "explore-${DATASET_SIZE}.nt" rm "explore-update-${DATASET_SIZE}.nt" -rm -r td_data \ No newline at end of file +rm -r td_data diff --git a/lib/src/sparql/error.rs b/lib/src/sparql/error.rs index 0f0a52b3..e112d435 100644 --- a/lib/src/sparql/error.rs +++ b/lib/src/sparql/error.rs @@ -16,9 +16,8 @@ pub enum EvaluationError { Io(io::Error), /// An error returned during the query evaluation itself Query(QueryError), - /// A conflict during a transaction #[doc(hidden)] - Conflict, + Extra, } #[derive(Debug)] @@ -38,7 +37,7 @@ impl fmt::Display for EvaluationError { Self::Parsing(error) => error.fmt(f), Self::Io(error) => error.fmt(f), Self::Query(error) => error.fmt(f), - Self::Conflict => write!(f, "Transaction conflict"), + Self::Extra => write!(f, "Unknown error"), } } } diff --git a/lib/src/sparql/mod.rs b/lib/src/sparql/mod.rs index 91b4645d..7445a179 100644 --- a/lib/src/sparql/mod.rs +++ b/lib/src/sparql/mod.rs @@ -29,7 +29,7 @@ pub use crate::sparql::model::{Variable, VariableNameParseError}; use crate::sparql::plan_builder::PlanBuilder; pub use crate::sparql::service::ServiceHandler; use crate::sparql::service::{EmptyServiceHandler, ErrorConversionServiceHandler}; -use crate::sparql::update::SimpleUpdateEvaluator; +pub(crate) use crate::sparql::update::evaluate_update; use crate::storage::Storage; pub use spargebra::ParseError; use std::rc::Rc; @@ -175,11 +175,3 @@ impl From for UpdateOptions { Self { query_options } } } - -pub(crate) fn evaluate_update( - storage: &Storage, - update: Update, - options: UpdateOptions, -) -> Result<(), EvaluationError> { - SimpleUpdateEvaluator::run(storage, update, options) -} diff --git a/lib/src/sparql/update.rs b/lib/src/sparql/update.rs index 8128e40a..32e9ff9c 100644 --- a/lib/src/sparql/update.rs +++ b/lib/src/sparql/update.rs @@ -1,4 +1,5 @@ -use crate::io::GraphFormat; +use crate::error::invalid_input_error; +use crate::io::{GraphFormat, GraphParser}; use crate::model::{ BlankNode as OxBlankNode, GraphName as OxGraphName, GraphNameRef, Literal as OxLiteral, NamedNode as OxNamedNode, NamedNodeRef, Quad as OxQuad, Term as OxTerm, Triple as OxTriple, @@ -10,7 +11,6 @@ use crate::sparql::http::Client; use crate::sparql::plan::EncodedTuple; use crate::sparql::plan_builder::PlanBuilder; use crate::sparql::{EvaluationError, Update, UpdateOptions}; -use crate::storage::io::load_graph; use crate::storage::numeric_encoder::{Decoder, EncodedTerm}; use crate::storage::{Storage, StorageWriter}; use oxiri::Iri; @@ -23,42 +23,50 @@ use spargebra::term::{ }; use spargebra::GraphUpdateOperation; use std::collections::HashMap; -use std::io; -use std::io::BufReader; +use std::io::{BufReader, Error, ErrorKind}; use std::rc::Rc; -pub struct SimpleUpdateEvaluator { - transaction: StorageWriter, - base_iri: Option>>, +pub fn evaluate_update( + storage: &Storage, + update: Update, options: UpdateOptions, - client: Client, -} - -impl SimpleUpdateEvaluator { - pub fn run( - storage: &Storage, - update: Update, - options: UpdateOptions, - ) -> Result<(), EvaluationError> { - let client = Client::new(options.query_options.http_timeout); - let mut evaluator = Self { - transaction: storage.transaction(), - base_iri: update.inner.base_iri.map(Rc::new), - options, - client, - }; - match evaluator.eval_all(&update.inner.operations, &update.using_datasets) { - Ok(_) => { - evaluator.transaction.commit()?; - Ok(()) +) -> Result<(), EvaluationError> { + let base_iri = update.inner.base_iri.map(Rc::new); + let client = Client::new(options.query_options.http_timeout); + storage + .transaction(move |transaction| { + SimpleUpdateEvaluator { + transaction, + base_iri: base_iri.clone(), + options: &options, + client: &client, } - Err(e) => { - evaluator.transaction.rollback()?; - Err(e) + .eval_all(&update.inner.operations, &update.using_datasets) + .map_err(|e| match e { + EvaluationError::Io(e) => e, + q => Error::new(ErrorKind::Other, q), + }) + }) + .map_err(|e| { + if e.get_ref() + .map_or(false, |inner| inner.is::()) + { + *e.into_inner().unwrap().downcast().unwrap() + } else { + EvaluationError::Io(e) } - } - } + })?; + Ok(()) +} +struct SimpleUpdateEvaluator<'a> { + transaction: StorageWriter<'a>, + base_iri: Option>>, + options: &'a UpdateOptions, + client: &'a Client, +} + +impl SimpleUpdateEvaluator<'_> { fn eval_all( &mut self, updates: &[GraphUpdateOperation], @@ -173,14 +181,16 @@ impl SimpleUpdateEvaluator { GraphName::NamedNode(graph_name) => NamedNodeRef::new_unchecked(&graph_name.iri).into(), GraphName::DefaultGraph => GraphNameRef::DefaultGraph, }; - load_graph( - &mut self.transaction, - BufReader::new(body), - format, - to_graph_name, - Some(&from.iri), - ) - .map_err(io::Error::from)?; + let mut parser = GraphParser::from_format(format); + if let Some(base_iri) = &self.base_iri { + parser = parser + .with_base_iri(base_iri.as_str()) + .map_err(invalid_input_error)?; + } + for t in parser.read_triples(BufReader::new(body))? { + self.transaction + .insert(t?.as_ref().in_graph(to_graph_name))?; + } Ok(()) } diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index 8818bf05..1393c693 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -351,8 +351,33 @@ impl Db { } } + pub fn transaction(&self, f: impl Fn(&mut Transaction) -> Result) -> Result { + loop { + let mut transaction = self.build_transaction(); + match { f(&mut transaction) } { + Ok(result) => { + transaction.commit()?; + return Ok(result); + } + Err(e) => { + transaction.rollback()?; + let is_conflict_error = e.get_ref().map_or(false, |e| { + let msg = e.to_string(); + msg == "Resource busy: " + || msg == "Operation timed out: Timeout waiting to lock key" + }); + if is_conflict_error { + // We retry + continue; + } + return Err(e); + } + } + } + } + #[must_use] - pub fn transaction(&self) -> Transaction { + fn build_transaction(&self) -> Transaction { unsafe { let transaction = rocksdb_transaction_begin( self.0.db, @@ -638,12 +663,12 @@ impl Drop for InnerTransaction { } impl Transaction { - pub fn commit(self) -> Result<()> { + fn commit(self) -> Result<()> { self.inner.is_ended.set(true); unsafe { ffi_result!(rocksdb_transaction_commit(self.inner.transaction)) } } - pub fn rollback(self) -> Result<()> { + fn rollback(self) -> Result<()> { self.inner.is_ended.set(true); unsafe { ffi_result!(rocksdb_transaction_rollback(self.inner.transaction)) } } @@ -877,7 +902,7 @@ fn convert_error(ptr: *const c_char) -> Error { } fn other_error(error: impl Into>) -> Error { - Error::new(ErrorKind::InvalidInput, error) + Error::new(ErrorKind::Other, error) } struct UnsafeEnv(*mut rocksdb_env_t); @@ -897,7 +922,7 @@ fn path_to_cstring(path: &Path) -> Result { fn test_transaction_read_after_commit() -> Result<()> { let db = Db::new(vec![])?; let cf = db.column_family("default").unwrap(); - let mut tr = db.transaction(); + let mut tr = db.build_transaction(); let reader = tr.reader(); tr.insert(&cf, b"test", b"foo")?; assert_eq!(reader.get(&cf, b"test")?.as_deref(), Some(b"foo".as_ref())); diff --git a/lib/src/storage/io.rs b/lib/src/storage/io.rs deleted file mode 100644 index ae6edc79..00000000 --- a/lib/src/storage/io.rs +++ /dev/null @@ -1,70 +0,0 @@ -//! Utilities for I/O from the store - -use crate::error::invalid_input_error; -use crate::io::{ - DatasetFormat, DatasetParser, DatasetSerializer, GraphFormat, GraphParser, GraphSerializer, -}; -use crate::model::{GraphNameRef, Quad, Triple}; -use crate::storage::StorageWriter; -use std::io::{BufRead, Result, Write}; - -pub fn load_graph( - writer: &mut StorageWriter, - reader: impl BufRead, - format: GraphFormat, - to_graph_name: GraphNameRef<'_>, - base_iri: Option<&str>, -) -> Result<()> { - let mut parser = GraphParser::from_format(format); - if let Some(base_iri) = base_iri { - parser = parser - .with_base_iri(base_iri) - .map_err(invalid_input_error)?; - } - for t in parser.read_triples(reader)? { - writer.insert(t?.as_ref().in_graph(to_graph_name))?; - } - Ok(()) -} - -pub fn dump_graph( - triples: impl Iterator>, - writer: impl Write, - format: GraphFormat, -) -> Result<()> { - let mut writer = GraphSerializer::from_format(format).triple_writer(writer)?; - for triple in triples { - writer.write(&triple?)?; - } - writer.finish() -} - -pub fn load_dataset( - writer: &mut StorageWriter, - reader: impl BufRead, - format: DatasetFormat, - base_iri: Option<&str>, -) -> Result<()> { - let mut parser = DatasetParser::from_format(format); - if let Some(base_iri) = base_iri { - parser = parser - .with_base_iri(base_iri) - .map_err(invalid_input_error)?; - } - for t in parser.read_quads(reader)? { - writer.insert(t?.as_ref())?; - } - Ok(()) -} - -pub fn dump_dataset( - quads: impl Iterator>, - writer: impl Write, - format: DatasetFormat, -) -> Result<()> { - let mut writer = DatasetSerializer::from_format(format).quad_writer(writer)?; - for quad in quads { - writer.write(&quad?)?; - } - writer.finish() -} diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 46c079f1..183fff34 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -12,7 +12,6 @@ use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; #[cfg(not(target_arch = "wasm32"))] use std::collections::{HashMap, HashSet}; use std::io::Result; -use std::mem::swap; #[cfg(not(target_arch = "wasm32"))] use std::mem::take; #[cfg(not(target_arch = "wasm32"))] @@ -22,7 +21,6 @@ use std::thread::spawn; mod backend; mod binary_encoder; -pub mod io; pub mod numeric_encoder; pub mod small_string; @@ -147,23 +145,25 @@ impl Storage { let mut version = this.ensure_version()?; if version == 0 { - let mut transaction = this.db.transaction(); - let mut size = 0; // We migrate to v1 + let mut graph_names = HashSet::new(); for quad in this.reader().quads() { let quad = quad?; if !quad.graph_name.is_default_graph() { - transaction.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name))?; - size += 1; - if size % BULK_LOAD_BATCH_SIZE == 0 { - let mut tr = this.db.transaction(); - swap(&mut transaction, &mut tr); - tr.commit()?; - } + graph_names.insert(quad.graph_name); } } - transaction.commit()?; - this.db.flush(&this.graphs_cf)?; + let mut graph_names = graph_names + .into_iter() + .map(|g| encode_term(&g)) + .collect::>(); + graph_names.sort_unstable(); + let mut stt_file = this.db.new_sst_file()?; + for k in graph_names { + stt_file.insert_empty(&k)?; + } + this.db + .write_stt_files(vec![(&this.graphs_cf, stt_file.finish()?)])?; version = 1; this.update_version(version)?; } @@ -195,9 +195,8 @@ impl Storage { } fn update_version(&self, version: u64) -> Result<()> { - let mut transaction = self.db.transaction(); - transaction.insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?; - transaction.commit()?; + self.db + .transaction(|t| t.insert(&self.default_cf, b"oxversion", &version.to_be_bytes()))?; self.db.flush(&self.default_cf) } @@ -217,12 +216,14 @@ impl Storage { } } - pub fn transaction(&self) -> StorageWriter { - StorageWriter { - buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE), - transaction: self.db.transaction(), - storage: self.clone(), - } + pub fn transaction(&self, f: impl Fn(StorageWriter<'_>) -> Result) -> Result { + self.db.transaction(|transaction| { + f(StorageWriter { + buffer: Vec::new(), + transaction, + storage: self, + }) + }) } #[cfg(not(target_arch = "wasm32"))] @@ -671,13 +672,13 @@ impl StrLookup for StorageReader { } } -pub struct StorageWriter { +pub struct StorageWriter<'a> { buffer: Vec, - transaction: Transaction, - storage: Storage, + transaction: &'a mut Transaction, + storage: &'a Storage, } -impl StorageWriter { +impl<'a> StorageWriter<'a> { pub fn reader(&self) -> StorageReader { StorageReader { reader: self.transaction.reader(), @@ -961,14 +962,6 @@ impl StorageWriter { } Ok(()) } - - pub fn commit(self) -> Result<()> { - self.transaction.commit() - } - - pub fn rollback(self) -> Result<()> { - self.transaction.rollback() - } } /// Creates a database from a dataset files. @@ -1061,7 +1054,7 @@ impl BulkLoader { .into_iter() .map(|(k, v)| (k.to_be_bytes(), v)) .collect::>(); - id2str.sort(); + id2str.sort_unstable(); let mut id2str_sst = self.storage.db.new_sst_file()?; for (k, v) in id2str { id2str_sst.insert(&k, v.as_bytes())?; @@ -1193,30 +1186,3 @@ impl BulkLoader { sst.finish() } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::model::NamedNodeRef; - - #[test] - fn test_transaction_isolation() -> Result<()> { - let quad = QuadRef::new( - NamedNodeRef::new_unchecked("http://example.com/s"), - NamedNodeRef::new_unchecked("http://example.com/p"), - NamedNodeRef::new_unchecked("http://example.com/o"), - NamedNodeRef::new_unchecked("http://example.com/g"), - ); - let storage = Storage::new()?; - let mut t1 = storage.transaction(); - let snapshot = storage.snapshot(); - t1.insert(quad)?; - t1.commit()?; - assert_eq!(snapshot.len()?, 0); - let mut t2 = storage.transaction(); - let mut t3 = storage.transaction(); - t2.insert(quad)?; - assert!(t3.remove(quad).is_err()); // Already locked - Ok(()) - } -} diff --git a/lib/src/store.rs b/lib/src/store.rs index a52cae1b..50388fa2 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -24,7 +24,9 @@ //! # Result::<_,Box>::Ok(()) //! ``` use crate::error::invalid_input_error; -use crate::io::{DatasetFormat, DatasetParser, GraphFormat, GraphParser}; +use crate::io::{ + DatasetFormat, DatasetParser, DatasetSerializer, GraphFormat, GraphParser, GraphSerializer, +}; use crate::model::*; use crate::sparql::{ evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update, @@ -32,7 +34,6 @@ use crate::sparql::{ }; #[cfg(not(target_arch = "wasm32"))] use crate::storage::bulk_load; -use crate::storage::io::{dump_dataset, dump_graph, load_dataset, load_graph}; use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm}; use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader}; use std::io::{BufRead, Write}; @@ -237,7 +238,7 @@ impl Store { /// Loads a graph file (i.e. triples) into the store. /// - /// This function is atomic and quite slow. To get much better performances you should use [`create_from_dataset`]. + /// This function is atomic and quite slow and memory hungry. To get much better performances you might want to use [`bulk_load_graph`]. /// /// Usage example: /// ``` @@ -267,14 +268,27 @@ impl Store { to_graph_name: impl Into>, base_iri: Option<&str>, ) -> io::Result<()> { - let mut writer = self.storage.transaction(); - load_graph(&mut writer, reader, format, to_graph_name.into(), base_iri)?; - writer.commit() + let mut parser = GraphParser::from_format(format); + if let Some(base_iri) = base_iri { + parser = parser + .with_base_iri(base_iri) + .map_err(invalid_input_error)?; + } + let quads = parser + .read_triples(reader)? + .collect::>>()?; + let to_graph_name = to_graph_name.into(); + self.storage.transaction(move |mut t| { + for quad in &quads { + t.insert(quad.as_ref().in_graph(to_graph_name))?; + } + Ok(()) + }) } /// Loads a dataset file (i.e. quads) into the store. /// - /// This function is atomic and quite slow. To get much better performances you should use [`create_from_dataset`]. + /// This function is atomic and quite slow. To get much better performances you might want to [`bulk_load_dataset`]. /// /// Usage example: /// ``` @@ -303,9 +317,19 @@ impl Store { format: DatasetFormat, base_iri: Option<&str>, ) -> io::Result<()> { - let mut writer = self.storage.transaction(); - load_dataset(&mut writer, reader, format, base_iri)?; - writer.commit() + let mut parser = DatasetParser::from_format(format); + if let Some(base_iri) = base_iri { + parser = parser + .with_base_iri(base_iri) + .map_err(invalid_input_error)?; + } + let quads = parser.read_quads(reader)?.collect::>>()?; + self.storage.transaction(move |mut t| { + for quad in &quads { + t.insert(quad.into())?; + } + Ok(()) + }) } /// Adds a quad to this store. @@ -327,10 +351,21 @@ impl Store { /// # Result::<_,Box>::Ok(()) /// ``` pub fn insert<'a>(&self, quad: impl Into>) -> io::Result { - let mut writer = self.storage.transaction(); - let result = writer.insert(quad.into())?; - writer.commit()?; - Ok(result) + let quad = quad.into(); + self.storage.transaction(move |mut t| t.insert(quad)) + } + + /// Adds atomically a set of quads to this store. + /// + /// Warning: This operation uses a memory heavy transaction internally, use [`bulk_extend`] if you plan to add ten of millions of triples. + pub fn extend(&self, quads: impl IntoIterator) -> io::Result<()> { + let quads = quads.into_iter().collect::>(); + self.storage.transaction(move |mut t| { + for quad in &quads { + t.insert(quad.into())?; + } + Ok(()) + }) } /// Removes a quad from this store. @@ -353,10 +388,8 @@ impl Store { /// # Result::<_,Box>::Ok(()) /// ``` pub fn remove<'a>(&self, quad: impl Into>) -> io::Result { - let mut writer = self.storage.transaction(); - let result = writer.remove(quad.into())?; - writer.commit()?; - Ok(result) + let quad = quad.into(); + self.storage.transaction(move |mut t| t.remove(quad)) } /// Dumps a store graph into a file. @@ -383,12 +416,11 @@ impl Store { format: GraphFormat, from_graph_name: impl Into>, ) -> io::Result<()> { - dump_graph( - self.quads_for_pattern(None, None, None, Some(from_graph_name.into())) - .map(|q| Ok(q?.into())), - writer, - format, - ) + let mut writer = GraphSerializer::from_format(format).triple_writer(writer)?; + for quad in self.quads_for_pattern(None, None, None, Some(from_graph_name.into())) { + writer.write(quad?.as_ref())?; + } + writer.finish() } /// Dumps the store into a file. @@ -408,7 +440,11 @@ impl Store { /// # std::io::Result::Ok(()) /// ``` pub fn dump_dataset(&self, writer: impl Write, format: DatasetFormat) -> io::Result<()> { - dump_dataset(self.iter(), writer, format) + let mut writer = DatasetSerializer::from_format(format).quad_writer(writer)?; + for quad in self.iter() { + writer.write(&quad?)?; + } + writer.finish() } /// Returns all the store named graphs @@ -474,10 +510,9 @@ impl Store { &self, graph_name: impl Into>, ) -> io::Result { - let mut writer = self.storage.transaction(); - let result = writer.insert_named_graph(graph_name.into())?; - writer.commit()?; - Ok(result) + let graph_name = graph_name.into(); + self.storage + .transaction(move |mut t| t.insert_named_graph(graph_name)) } /// Clears a graph from this store. @@ -499,9 +534,9 @@ impl Store { /// # Result::<_,Box>::Ok(()) /// ``` pub fn clear_graph<'a>(&self, graph_name: impl Into>) -> io::Result<()> { - let mut writer = self.storage.transaction(); - writer.clear_graph(graph_name.into())?; - writer.commit() + let graph_name = graph_name.into(); + self.storage + .transaction(move |mut t| t.clear_graph(graph_name)) } /// Removes a graph from this store. @@ -528,10 +563,9 @@ impl Store { &self, graph_name: impl Into>, ) -> io::Result { - let mut writer = self.storage.transaction(); - let result = writer.remove_named_graph(graph_name.into())?; - writer.commit()?; - Ok(result) + let graph_name = graph_name.into(); + self.storage + .transaction(move |mut t| t.remove_named_graph(graph_name)) } /// Clears the store. @@ -552,9 +586,7 @@ impl Store { /// # Result::<_,Box>::Ok(()) /// ``` pub fn clear(&self) -> io::Result<()> { - let mut writer = self.storage.transaction(); - writer.clear()?; - writer.commit() + self.storage.transaction(|mut t| t.clear()) } /// Flushes all buffers and ensures that all writes are saved on disk. @@ -580,10 +612,12 @@ impl Store { /// Loads a dataset file efficiently into the store. /// - /// Warning: This function is optimized for speed and might eat a lot of memory. + /// This function is optimized for large dataset loading speed. For small files, [`load_dataset`] might be more convenient. /// /// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store. /// + /// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files. + /// /// Usage example: /// ``` /// use oxigraph::store::Store; @@ -623,10 +657,12 @@ impl Store { /// Loads a dataset file efficiently into the store. /// - /// Warning: This function is optimized for speed and might eat a lot of memory. + /// This function is optimized for large dataset loading speed. For small files, [`load_graph`] might be more convenient. /// /// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store. /// + /// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files. + /// /// Usage example: /// ``` /// use oxigraph::store::Store; @@ -669,6 +705,15 @@ impl Store { .map(|r| Ok(r?.in_graph(to_graph_name.into_owned()))), ) } + + /// Adds a set of triples to this store using bulk load. + /// + /// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store. + /// + /// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files. + pub fn bulk_extend(&mut self, quads: impl IntoIterator) -> io::Result<()> { + bulk_load(&self.storage, quads.into_iter().map(Ok)) + } } impl fmt::Display for Store {