Automatically retries transactions on failure

pull/173/head
Tpt 3 years ago
parent eb4fb8254a
commit a3e40556da
  1. 2
      bench/bsbm_oxigraph.sh
  2. 5
      lib/src/sparql/error.rs
  3. 10
      lib/src/sparql/mod.rs
  4. 86
      lib/src/sparql/update.rs
  5. 35
      lib/src/storage/backend/rocksdb.rs
  6. 70
      lib/src/storage/io.rs
  7. 88
      lib/src/storage/mod.rs
  8. 127
      lib/src/store.rs

@ -7,7 +7,7 @@ cd bsbm-tools
cargo build --release --manifest-path="../../server/Cargo.toml"
./../../target/release/oxigraph_server --location oxigraph_data load --file "explore-${DATASET_SIZE}.nt"
./../../target/release/oxigraph_server --location oxigraph_data serve --bind 127.0.0.1:7878 &
sleep 5
sleep 1
curl -f -X POST -H 'Content-Type:application/n-triples' --data-binary "@explore-${DATASET_SIZE}.nt" http://127.0.0.1:7878/store?default
./testdriver -mt ${PARALLELISM} -ucf usecases/explore/sparql.txt -o "../bsbm.explore.oxigraph.${DATASET_SIZE}.${PARALLELISM}.main.xml" http://127.0.0.1:7878/query
./testdriver -mt ${PARALLELISM} -ucf usecases/exploreAndUpdate/sparql.txt -o "../bsbm.exploreAndUpdate.oxigraph.${DATASET_SIZE}.${PARALLELISM}.main.xml" http://127.0.0.1:7878/query -u http://127.0.0.1:7878/update -udataset "explore-update-${DATASET_SIZE}.nt"

@ -16,9 +16,8 @@ pub enum EvaluationError {
Io(io::Error),
/// An error returned during the query evaluation itself
Query(QueryError),
/// A conflict during a transaction
#[doc(hidden)]
Conflict,
Extra,
}
#[derive(Debug)]
@ -38,7 +37,7 @@ impl fmt::Display for EvaluationError {
Self::Parsing(error) => error.fmt(f),
Self::Io(error) => error.fmt(f),
Self::Query(error) => error.fmt(f),
Self::Conflict => write!(f, "Transaction conflict"),
Self::Extra => write!(f, "Unknown error"),
}
}
}

@ -29,7 +29,7 @@ pub use crate::sparql::model::{Variable, VariableNameParseError};
use crate::sparql::plan_builder::PlanBuilder;
pub use crate::sparql::service::ServiceHandler;
use crate::sparql::service::{EmptyServiceHandler, ErrorConversionServiceHandler};
use crate::sparql::update::SimpleUpdateEvaluator;
pub(crate) use crate::sparql::update::evaluate_update;
use crate::storage::Storage;
pub use spargebra::ParseError;
use std::rc::Rc;
@ -175,11 +175,3 @@ impl From<QueryOptions> for UpdateOptions {
Self { query_options }
}
}
pub(crate) fn evaluate_update(
storage: &Storage,
update: Update,
options: UpdateOptions,
) -> Result<(), EvaluationError> {
SimpleUpdateEvaluator::run(storage, update, options)
}

@ -1,4 +1,5 @@
use crate::io::GraphFormat;
use crate::error::invalid_input_error;
use crate::io::{GraphFormat, GraphParser};
use crate::model::{
BlankNode as OxBlankNode, GraphName as OxGraphName, GraphNameRef, Literal as OxLiteral,
NamedNode as OxNamedNode, NamedNodeRef, Quad as OxQuad, Term as OxTerm, Triple as OxTriple,
@ -10,7 +11,6 @@ use crate::sparql::http::Client;
use crate::sparql::plan::EncodedTuple;
use crate::sparql::plan_builder::PlanBuilder;
use crate::sparql::{EvaluationError, Update, UpdateOptions};
use crate::storage::io::load_graph;
use crate::storage::numeric_encoder::{Decoder, EncodedTerm};
use crate::storage::{Storage, StorageWriter};
use oxiri::Iri;
@ -23,42 +23,50 @@ use spargebra::term::{
};
use spargebra::GraphUpdateOperation;
use std::collections::HashMap;
use std::io;
use std::io::BufReader;
use std::io::{BufReader, Error, ErrorKind};
use std::rc::Rc;
pub struct SimpleUpdateEvaluator {
transaction: StorageWriter,
base_iri: Option<Rc<Iri<String>>>,
options: UpdateOptions,
client: Client,
}
impl SimpleUpdateEvaluator {
pub fn run(
pub fn evaluate_update(
storage: &Storage,
update: Update,
options: UpdateOptions,
) -> Result<(), EvaluationError> {
) -> Result<(), EvaluationError> {
let base_iri = update.inner.base_iri.map(Rc::new);
let client = Client::new(options.query_options.http_timeout);
let mut evaluator = Self {
transaction: storage.transaction(),
base_iri: update.inner.base_iri.map(Rc::new),
options,
client,
};
match evaluator.eval_all(&update.inner.operations, &update.using_datasets) {
Ok(_) => {
evaluator.transaction.commit()?;
Ok(())
}
Err(e) => {
evaluator.transaction.rollback()?;
Err(e)
}
}
storage
.transaction(move |transaction| {
SimpleUpdateEvaluator {
transaction,
base_iri: base_iri.clone(),
options: &options,
client: &client,
}
.eval_all(&update.inner.operations, &update.using_datasets)
.map_err(|e| match e {
EvaluationError::Io(e) => e,
q => Error::new(ErrorKind::Other, q),
})
})
.map_err(|e| {
if e.get_ref()
.map_or(false, |inner| inner.is::<EvaluationError>())
{
*e.into_inner().unwrap().downcast().unwrap()
} else {
EvaluationError::Io(e)
}
})?;
Ok(())
}
struct SimpleUpdateEvaluator<'a> {
transaction: StorageWriter<'a>,
base_iri: Option<Rc<Iri<String>>>,
options: &'a UpdateOptions,
client: &'a Client,
}
impl SimpleUpdateEvaluator<'_> {
fn eval_all(
&mut self,
updates: &[GraphUpdateOperation],
@ -173,14 +181,16 @@ impl SimpleUpdateEvaluator {
GraphName::NamedNode(graph_name) => NamedNodeRef::new_unchecked(&graph_name.iri).into(),
GraphName::DefaultGraph => GraphNameRef::DefaultGraph,
};
load_graph(
&mut self.transaction,
BufReader::new(body),
format,
to_graph_name,
Some(&from.iri),
)
.map_err(io::Error::from)?;
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = &self.base_iri {
parser = parser
.with_base_iri(base_iri.as_str())
.map_err(invalid_input_error)?;
}
for t in parser.read_triples(BufReader::new(body))? {
self.transaction
.insert(t?.as_ref().in_graph(to_graph_name))?;
}
Ok(())
}

@ -351,8 +351,33 @@ impl Db {
}
}
pub fn transaction<T>(&self, f: impl Fn(&mut Transaction) -> Result<T>) -> Result<T> {
loop {
let mut transaction = self.build_transaction();
match { f(&mut transaction) } {
Ok(result) => {
transaction.commit()?;
return Ok(result);
}
Err(e) => {
transaction.rollback()?;
let is_conflict_error = e.get_ref().map_or(false, |e| {
let msg = e.to_string();
msg == "Resource busy: "
|| msg == "Operation timed out: Timeout waiting to lock key"
});
if is_conflict_error {
// We retry
continue;
}
return Err(e);
}
}
}
}
#[must_use]
pub fn transaction(&self) -> Transaction {
fn build_transaction(&self) -> Transaction {
unsafe {
let transaction = rocksdb_transaction_begin(
self.0.db,
@ -638,12 +663,12 @@ impl Drop for InnerTransaction {
}
impl Transaction {
pub fn commit(self) -> Result<()> {
fn commit(self) -> Result<()> {
self.inner.is_ended.set(true);
unsafe { ffi_result!(rocksdb_transaction_commit(self.inner.transaction)) }
}
pub fn rollback(self) -> Result<()> {
fn rollback(self) -> Result<()> {
self.inner.is_ended.set(true);
unsafe { ffi_result!(rocksdb_transaction_rollback(self.inner.transaction)) }
}
@ -877,7 +902,7 @@ fn convert_error(ptr: *const c_char) -> Error {
}
fn other_error(error: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Error {
Error::new(ErrorKind::InvalidInput, error)
Error::new(ErrorKind::Other, error)
}
struct UnsafeEnv(*mut rocksdb_env_t);
@ -897,7 +922,7 @@ fn path_to_cstring(path: &Path) -> Result<CString> {
fn test_transaction_read_after_commit() -> Result<()> {
let db = Db::new(vec![])?;
let cf = db.column_family("default").unwrap();
let mut tr = db.transaction();
let mut tr = db.build_transaction();
let reader = tr.reader();
tr.insert(&cf, b"test", b"foo")?;
assert_eq!(reader.get(&cf, b"test")?.as_deref(), Some(b"foo".as_ref()));

@ -1,70 +0,0 @@
//! Utilities for I/O from the store
use crate::error::invalid_input_error;
use crate::io::{
DatasetFormat, DatasetParser, DatasetSerializer, GraphFormat, GraphParser, GraphSerializer,
};
use crate::model::{GraphNameRef, Quad, Triple};
use crate::storage::StorageWriter;
use std::io::{BufRead, Result, Write};
pub fn load_graph(
writer: &mut StorageWriter,
reader: impl BufRead,
format: GraphFormat,
to_graph_name: GraphNameRef<'_>,
base_iri: Option<&str>,
) -> Result<()> {
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(invalid_input_error)?;
}
for t in parser.read_triples(reader)? {
writer.insert(t?.as_ref().in_graph(to_graph_name))?;
}
Ok(())
}
pub fn dump_graph(
triples: impl Iterator<Item = Result<Triple>>,
writer: impl Write,
format: GraphFormat,
) -> Result<()> {
let mut writer = GraphSerializer::from_format(format).triple_writer(writer)?;
for triple in triples {
writer.write(&triple?)?;
}
writer.finish()
}
pub fn load_dataset(
writer: &mut StorageWriter,
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> Result<()> {
let mut parser = DatasetParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(invalid_input_error)?;
}
for t in parser.read_quads(reader)? {
writer.insert(t?.as_ref())?;
}
Ok(())
}
pub fn dump_dataset(
quads: impl Iterator<Item = Result<Quad>>,
writer: impl Write,
format: DatasetFormat,
) -> Result<()> {
let mut writer = DatasetSerializer::from_format(format).quad_writer(writer)?;
for quad in quads {
writer.write(&quad?)?;
}
writer.finish()
}

@ -12,7 +12,6 @@ use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
#[cfg(not(target_arch = "wasm32"))]
use std::collections::{HashMap, HashSet};
use std::io::Result;
use std::mem::swap;
#[cfg(not(target_arch = "wasm32"))]
use std::mem::take;
#[cfg(not(target_arch = "wasm32"))]
@ -22,7 +21,6 @@ use std::thread::spawn;
mod backend;
mod binary_encoder;
pub mod io;
pub mod numeric_encoder;
pub mod small_string;
@ -147,23 +145,25 @@ impl Storage {
let mut version = this.ensure_version()?;
if version == 0 {
let mut transaction = this.db.transaction();
let mut size = 0;
// We migrate to v1
let mut graph_names = HashSet::new();
for quad in this.reader().quads() {
let quad = quad?;
if !quad.graph_name.is_default_graph() {
transaction.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name))?;
size += 1;
if size % BULK_LOAD_BATCH_SIZE == 0 {
let mut tr = this.db.transaction();
swap(&mut transaction, &mut tr);
tr.commit()?;
graph_names.insert(quad.graph_name);
}
}
let mut graph_names = graph_names
.into_iter()
.map(|g| encode_term(&g))
.collect::<Vec<_>>();
graph_names.sort_unstable();
let mut stt_file = this.db.new_sst_file()?;
for k in graph_names {
stt_file.insert_empty(&k)?;
}
transaction.commit()?;
this.db.flush(&this.graphs_cf)?;
this.db
.write_stt_files(vec![(&this.graphs_cf, stt_file.finish()?)])?;
version = 1;
this.update_version(version)?;
}
@ -195,9 +195,8 @@ impl Storage {
}
fn update_version(&self, version: u64) -> Result<()> {
let mut transaction = self.db.transaction();
transaction.insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?;
transaction.commit()?;
self.db
.transaction(|t| t.insert(&self.default_cf, b"oxversion", &version.to_be_bytes()))?;
self.db.flush(&self.default_cf)
}
@ -217,12 +216,14 @@ impl Storage {
}
}
pub fn transaction(&self) -> StorageWriter {
StorageWriter {
buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE),
transaction: self.db.transaction(),
storage: self.clone(),
}
pub fn transaction<T>(&self, f: impl Fn(StorageWriter<'_>) -> Result<T>) -> Result<T> {
self.db.transaction(|transaction| {
f(StorageWriter {
buffer: Vec::new(),
transaction,
storage: self,
})
})
}
#[cfg(not(target_arch = "wasm32"))]
@ -671,13 +672,13 @@ impl StrLookup for StorageReader {
}
}
pub struct StorageWriter {
pub struct StorageWriter<'a> {
buffer: Vec<u8>,
transaction: Transaction,
storage: Storage,
transaction: &'a mut Transaction,
storage: &'a Storage,
}
impl StorageWriter {
impl<'a> StorageWriter<'a> {
pub fn reader(&self) -> StorageReader {
StorageReader {
reader: self.transaction.reader(),
@ -961,14 +962,6 @@ impl StorageWriter {
}
Ok(())
}
pub fn commit(self) -> Result<()> {
self.transaction.commit()
}
pub fn rollback(self) -> Result<()> {
self.transaction.rollback()
}
}
/// Creates a database from a dataset files.
@ -1061,7 +1054,7 @@ impl BulkLoader {
.into_iter()
.map(|(k, v)| (k.to_be_bytes(), v))
.collect::<Vec<_>>();
id2str.sort();
id2str.sort_unstable();
let mut id2str_sst = self.storage.db.new_sst_file()?;
for (k, v) in id2str {
id2str_sst.insert(&k, v.as_bytes())?;
@ -1193,30 +1186,3 @@ impl BulkLoader {
sst.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::model::NamedNodeRef;
#[test]
fn test_transaction_isolation() -> Result<()> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"),
NamedNodeRef::new_unchecked("http://example.com/g"),
);
let storage = Storage::new()?;
let mut t1 = storage.transaction();
let snapshot = storage.snapshot();
t1.insert(quad)?;
t1.commit()?;
assert_eq!(snapshot.len()?, 0);
let mut t2 = storage.transaction();
let mut t3 = storage.transaction();
t2.insert(quad)?;
assert!(t3.remove(quad).is_err()); // Already locked
Ok(())
}
}

@ -24,7 +24,9 @@
//! # Result::<_,Box<dyn std::error::Error>>::Ok(())
//! ```
use crate::error::invalid_input_error;
use crate::io::{DatasetFormat, DatasetParser, GraphFormat, GraphParser};
use crate::io::{
DatasetFormat, DatasetParser, DatasetSerializer, GraphFormat, GraphParser, GraphSerializer,
};
use crate::model::*;
use crate::sparql::{
evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update,
@ -32,7 +34,6 @@ use crate::sparql::{
};
#[cfg(not(target_arch = "wasm32"))]
use crate::storage::bulk_load;
use crate::storage::io::{dump_dataset, dump_graph, load_dataset, load_graph};
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader};
use std::io::{BufRead, Write};
@ -237,7 +238,7 @@ impl Store {
/// Loads a graph file (i.e. triples) into the store.
///
/// This function is atomic and quite slow. To get much better performances you should use [`create_from_dataset`].
/// This function is atomic and quite slow and memory hungry. To get much better performances you might want to use [`bulk_load_graph`].
///
/// Usage example:
/// ```
@ -267,14 +268,27 @@ impl Store {
to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>,
) -> io::Result<()> {
let mut writer = self.storage.transaction();
load_graph(&mut writer, reader, format, to_graph_name.into(), base_iri)?;
writer.commit()
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(invalid_input_error)?;
}
let quads = parser
.read_triples(reader)?
.collect::<io::Result<Vec<_>>>()?;
let to_graph_name = to_graph_name.into();
self.storage.transaction(move |mut t| {
for quad in &quads {
t.insert(quad.as_ref().in_graph(to_graph_name))?;
}
Ok(())
})
}
/// Loads a dataset file (i.e. quads) into the store.
///
/// This function is atomic and quite slow. To get much better performances you should use [`create_from_dataset`].
/// This function is atomic and quite slow. To get much better performances you might want to [`bulk_load_dataset`].
///
/// Usage example:
/// ```
@ -303,9 +317,19 @@ impl Store {
format: DatasetFormat,
base_iri: Option<&str>,
) -> io::Result<()> {
let mut writer = self.storage.transaction();
load_dataset(&mut writer, reader, format, base_iri)?;
writer.commit()
let mut parser = DatasetParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(invalid_input_error)?;
}
let quads = parser.read_quads(reader)?.collect::<io::Result<Vec<_>>>()?;
self.storage.transaction(move |mut t| {
for quad in &quads {
t.insert(quad.into())?;
}
Ok(())
})
}
/// Adds a quad to this store.
@ -327,10 +351,21 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn insert<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
let mut writer = self.storage.transaction();
let result = writer.insert(quad.into())?;
writer.commit()?;
Ok(result)
let quad = quad.into();
self.storage.transaction(move |mut t| t.insert(quad))
}
/// Adds atomically a set of quads to this store.
///
/// Warning: This operation uses a memory heavy transaction internally, use [`bulk_extend`] if you plan to add ten of millions of triples.
pub fn extend(&self, quads: impl IntoIterator<Item = Quad>) -> io::Result<()> {
let quads = quads.into_iter().collect::<Vec<_>>();
self.storage.transaction(move |mut t| {
for quad in &quads {
t.insert(quad.into())?;
}
Ok(())
})
}
/// Removes a quad from this store.
@ -353,10 +388,8 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn remove<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
let mut writer = self.storage.transaction();
let result = writer.remove(quad.into())?;
writer.commit()?;
Ok(result)
let quad = quad.into();
self.storage.transaction(move |mut t| t.remove(quad))
}
/// Dumps a store graph into a file.
@ -383,12 +416,11 @@ impl Store {
format: GraphFormat,
from_graph_name: impl Into<GraphNameRef<'a>>,
) -> io::Result<()> {
dump_graph(
self.quads_for_pattern(None, None, None, Some(from_graph_name.into()))
.map(|q| Ok(q?.into())),
writer,
format,
)
let mut writer = GraphSerializer::from_format(format).triple_writer(writer)?;
for quad in self.quads_for_pattern(None, None, None, Some(from_graph_name.into())) {
writer.write(quad?.as_ref())?;
}
writer.finish()
}
/// Dumps the store into a file.
@ -408,7 +440,11 @@ impl Store {
/// # std::io::Result::Ok(())
/// ```
pub fn dump_dataset(&self, writer: impl Write, format: DatasetFormat) -> io::Result<()> {
dump_dataset(self.iter(), writer, format)
let mut writer = DatasetSerializer::from_format(format).quad_writer(writer)?;
for quad in self.iter() {
writer.write(&quad?)?;
}
writer.finish()
}
/// Returns all the store named graphs
@ -474,10 +510,9 @@ impl Store {
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
let mut writer = self.storage.transaction();
let result = writer.insert_named_graph(graph_name.into())?;
writer.commit()?;
Ok(result)
let graph_name = graph_name.into();
self.storage
.transaction(move |mut t| t.insert_named_graph(graph_name))
}
/// Clears a graph from this store.
@ -499,9 +534,9 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn clear_graph<'a>(&self, graph_name: impl Into<GraphNameRef<'a>>) -> io::Result<()> {
let mut writer = self.storage.transaction();
writer.clear_graph(graph_name.into())?;
writer.commit()
let graph_name = graph_name.into();
self.storage
.transaction(move |mut t| t.clear_graph(graph_name))
}
/// Removes a graph from this store.
@ -528,10 +563,9 @@ impl Store {
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
let mut writer = self.storage.transaction();
let result = writer.remove_named_graph(graph_name.into())?;
writer.commit()?;
Ok(result)
let graph_name = graph_name.into();
self.storage
.transaction(move |mut t| t.remove_named_graph(graph_name))
}
/// Clears the store.
@ -552,9 +586,7 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn clear(&self) -> io::Result<()> {
let mut writer = self.storage.transaction();
writer.clear()?;
writer.commit()
self.storage.transaction(|mut t| t.clear())
}
/// Flushes all buffers and ensures that all writes are saved on disk.
@ -580,10 +612,12 @@ impl Store {
/// Loads a dataset file efficiently into the store.
///
/// Warning: This function is optimized for speed and might eat a lot of memory.
/// This function is optimized for large dataset loading speed. For small files, [`load_dataset`] might be more convenient.
///
/// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store.
///
/// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files.
///
/// Usage example:
/// ```
/// use oxigraph::store::Store;
@ -623,10 +657,12 @@ impl Store {
/// Loads a dataset file efficiently into the store.
///
/// Warning: This function is optimized for speed and might eat a lot of memory.
/// This function is optimized for large dataset loading speed. For small files, [`load_graph`] might be more convenient.
///
/// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store.
///
/// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files.
///
/// Usage example:
/// ```
/// use oxigraph::store::Store;
@ -669,6 +705,15 @@ impl Store {
.map(|r| Ok(r?.in_graph(to_graph_name.into_owned()))),
)
}
/// Adds a set of triples to this store using bulk load.
///
/// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store.
///
/// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files.
pub fn bulk_extend(&mut self, quads: impl IntoIterator<Item = Quad>) -> io::Result<()> {
bulk_load(&self.storage, quads.into_iter().map(Ok))
}
}
impl fmt::Display for Store {

Loading…
Cancel
Save