diff --git a/lib/src/sparql/dataset.rs b/lib/src/sparql/dataset.rs index 3649316b..79ed807a 100644 --- a/lib/src/sparql/dataset.rs +++ b/lib/src/sparql/dataset.rs @@ -6,7 +6,6 @@ use crate::storage::Storage; use std::cell::RefCell; use std::collections::hash_map::Entry; use std::collections::HashMap; -use std::convert::Infallible; use std::iter::empty; pub struct DatasetView { @@ -141,11 +140,9 @@ impl DatasetView { pub fn encode_term<'a>(&self, term: impl Into>) -> EncodedTerm { let term = term.into(); let encoded = term.into(); - insert_term::(term, &encoded, |key, value| { - self.insert_str(key, value); - Ok(()) - }) - .unwrap(); // Can not fail + insert_term(term, &encoded, &mut |key, value| { + self.insert_str(key, value) + }); encoded } diff --git a/lib/src/sparql/update.rs b/lib/src/sparql/update.rs index 2416f309..1bab649a 100644 --- a/lib/src/sparql/update.rs +++ b/lib/src/sparql/update.rs @@ -12,7 +12,7 @@ use crate::sparql::plan_builder::PlanBuilder; use crate::sparql::{EvaluationError, UpdateOptions}; use crate::storage::io::load_graph; use crate::storage::numeric_encoder::{Decoder, EncodedTerm}; -use crate::storage::Storage; +use crate::storage::{Storage, StorageWriter}; use oxiri::Iri; use spargebra::algebra::{GraphPattern, GraphTarget}; use spargebra::term::{ @@ -29,6 +29,7 @@ use std::rc::Rc; pub struct SimpleUpdateEvaluator<'a> { storage: &'a Storage, + writer: StorageWriter, base_iri: Option>>, options: UpdateOptions, client: Client, @@ -41,8 +42,10 @@ impl<'a> SimpleUpdateEvaluator<'a> { options: UpdateOptions, ) -> Self { let client = Client::new(options.query_options.http_timeout); + let writer = storage.atomic_writer(); Self { storage, + writer, base_iri, options, client, @@ -56,6 +59,7 @@ impl<'a> SimpleUpdateEvaluator<'a> { ) -> Result<(), EvaluationError> { for (update, using_dataset) in updates.iter().zip(using_datasets) { self.eval(update, using_dataset)?; + self.writer.commit()?; } Ok(()) } @@ -99,7 +103,7 @@ impl<'a> SimpleUpdateEvaluator<'a> { let mut bnodes = HashMap::new(); for quad in data { let quad = Self::convert_quad(quad, &mut bnodes); - self.storage.insert(quad.as_ref())?; + self.writer.insert(quad.as_ref())?; } Ok(()) } @@ -107,7 +111,7 @@ impl<'a> SimpleUpdateEvaluator<'a> { fn eval_delete_data(&mut self, data: &[GroundQuad]) -> Result<(), EvaluationError> { for quad in data { let quad = Self::convert_ground_quad(quad); - self.storage.remove(quad.as_ref())?; + self.writer.remove(quad.as_ref())?; } Ok(()) } @@ -133,7 +137,7 @@ impl<'a> SimpleUpdateEvaluator<'a> { if let Some(quad) = Self::convert_ground_quad_pattern(quad, &variables, &tuple, &dataset)? { - self.storage.remove(quad.as_ref())?; + self.writer.remove(quad.as_ref())?; if !insert.is_empty() { // Hack to make sure the triple terms are still available for an insert dataset.encode_term(quad.subject.as_ref()); @@ -146,7 +150,7 @@ impl<'a> SimpleUpdateEvaluator<'a> { if let Some(quad) = Self::convert_quad_pattern(quad, &variables, &tuple, &dataset, &mut bnodes)? { - self.storage.insert(quad.as_ref())?; + self.writer.insert(quad.as_ref())?; } } bnodes.clear(); @@ -170,7 +174,7 @@ impl<'a> SimpleUpdateEvaluator<'a> { GraphName::DefaultGraph => GraphNameRef::DefaultGraph, }; load_graph( - self.storage, + &mut self.writer, BufReader::new(body), format, to_graph_name, @@ -182,7 +186,7 @@ impl<'a> SimpleUpdateEvaluator<'a> { fn eval_create(&mut self, graph_name: &NamedNode, silent: bool) -> Result<(), EvaluationError> { let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri); - if self.storage.insert_named_graph(graph_name.into())? || silent { + if self.writer.insert_named_graph(graph_name.into())? || silent { Ok(()) } else { Err(EvaluationError::msg(format!( @@ -197,7 +201,7 @@ impl<'a> SimpleUpdateEvaluator<'a> { GraphTarget::NamedNode(graph_name) => { let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri); if self.storage.contains_named_graph(&graph_name.into())? { - Ok(self.storage.clear_graph(graph_name.into())?) + Ok(self.writer.clear_graph(graph_name.into())?) } else if silent { Ok(()) } else { @@ -208,11 +212,11 @@ impl<'a> SimpleUpdateEvaluator<'a> { } } GraphTarget::DefaultGraph => { - self.storage.clear_graph(GraphNameRef::DefaultGraph)?; + self.writer.clear_graph(GraphNameRef::DefaultGraph)?; Ok(()) } - GraphTarget::NamedGraphs => Ok(self.storage.clear_all_named_graphs()?), - GraphTarget::AllGraphs => Ok(self.storage.clear_all_graphs()?), + GraphTarget::NamedGraphs => Ok(self.writer.clear_all_named_graphs()?), + GraphTarget::AllGraphs => Ok(self.writer.clear_all_graphs()?), } } @@ -220,7 +224,7 @@ impl<'a> SimpleUpdateEvaluator<'a> { match graph { GraphTarget::NamedNode(graph_name) => { let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri); - if self.storage.remove_named_graph(graph_name.into())? || silent { + if self.writer.remove_named_graph(graph_name.into())? || silent { Ok(()) } else { Err(EvaluationError::msg(format!( @@ -229,11 +233,9 @@ impl<'a> SimpleUpdateEvaluator<'a> { ))) } } - GraphTarget::DefaultGraph => { - Ok(self.storage.clear_graph(GraphNameRef::DefaultGraph)?) - } - GraphTarget::NamedGraphs => Ok(self.storage.remove_all_named_graphs()?), - GraphTarget::AllGraphs => Ok(self.storage.clear()?), + GraphTarget::DefaultGraph => Ok(self.writer.clear_graph(GraphNameRef::DefaultGraph)?), + GraphTarget::NamedGraphs => Ok(self.writer.remove_all_named_graphs()?), + GraphTarget::AllGraphs => Ok(self.writer.clear()?), } } diff --git a/lib/src/storage/backend/fallback.rs b/lib/src/storage/backend/fallback.rs index 9e005e50..b1777929 100644 --- a/lib/src/storage/backend/fallback.rs +++ b/lib/src/storage/backend/fallback.rs @@ -1,43 +1,56 @@ //! TODO: This storage is dramatically naive. -use crate::storage::backend::{ColumnFamilyDefinition, CompactionAction, CompactionFilter}; -use std::collections::btree_map::Entry; -use std::collections::BTreeMap; +use crate::error::invalid_input_error; +use crate::storage::backend::{CompactionAction, CompactionFilter}; +use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::ffi::CString; -use std::io::Result; -use std::iter::{once, Once}; +use std::io::{Error, Result}; use std::sync::{Arc, RwLock}; +pub struct ColumnFamilyDefinition { + pub name: &'static str, + pub merge_operator: Option, + pub compaction_filter: Option, + pub use_iter: bool, + pub min_prefix_size: usize, +} + #[derive(Clone)] -pub struct Db(Arc>>); +pub struct Db(Arc); #[derive(Default)] -struct Tree { - tree: BTreeMap, Vec>, - merge_operator: Option, - compaction_filter: Option, +struct DbInternals { + trees: RwLock, Vec>>>, + merge_operators: HashMap, + compaction_filters: HashMap, } impl Db { pub fn new(column_families: Vec) -> Result { - let mut trees = BTreeMap::new(); + let mut trees = HashMap::new(); + let mut merge_operators = HashMap::new(); + let mut compaction_filters = HashMap::new(); for cf in column_families { - trees.insert( - ColumnFamily(cf.name), - Tree { - tree: BTreeMap::default(), - merge_operator: cf.merge_operator, - compaction_filter: cf.compaction_filter, - }, - ); + let name = ColumnFamily(cf.name); + trees.insert(name.clone(), BTreeMap::default()); + if let Some(me) = cf.merge_operator { + merge_operators.insert(name.clone(), me); + } + if let Some(cf) = cf.compaction_filter { + compaction_filters.insert(name.clone(), cf); + } } trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists. - Ok(Self(Arc::new(RwLock::new(trees)))) + Ok(Self(Arc::new(DbInternals { + trees: RwLock::new(trees), + merge_operators, + compaction_filters, + }))) } pub fn column_family(&self, name: &'static str) -> Option { let name = ColumnFamily(name); - if self.0.read().unwrap().contains_key(&name) { + if self.0.trees.read().unwrap().contains_key(&name) { Some(name) } else { None @@ -51,27 +64,55 @@ impl Db { pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result>> { Ok(self .0 + .trees .read() .unwrap() .get(column_family) - .unwrap() - .tree - .get(key) - .map(|v| v.to_vec())) + .and_then(|cf| cf.get(key).cloned())) } pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result { Ok(self .0 + .trees .read() .unwrap() .get(column_family) - .unwrap() - .tree - .contains_key(key.as_ref())) + .map_or(false, |cf| cf.contains_key(key))) + } + + pub fn new_batch(&self) -> WriteBatchWithIndex { + WriteBatchWithIndex { + by_cf: HashMap::new(), + db: self.clone(), + error: None, + } } - pub fn insert( + pub fn write(&self, batch: &mut WriteBatchWithIndex) -> Result<()> { + if let Some(error) = batch.error.take() { + return Err(error); + } + let mut trees = self.0.trees.write().unwrap(); + for (cf, ops) in batch.by_cf.drain() { + let tree = trees.get_mut(&cf).ok_or_else(|| { + invalid_input_error(format!("Unsupported column family {}", cf.0)) + })?; + for k in ops.to_remove { + tree.remove(&k); + } + for (k, v) in ops.to_insert { + tree.insert(k, v); + } + for (k, v) in ops.to_merge { + let v = self.exec_merge(&cf, &k, tree.get(&k).map(|v| v.as_slice()), &v)?; + tree.insert(k, v); + } + } + Ok(()) + } + + /*pub fn insert( &self, column_family: &ColumnFamily, key: &[u8], @@ -80,14 +121,10 @@ impl Db { ) -> Result<()> { let mut db = self.0.write().unwrap(); let tree = db.get_mut(column_family).unwrap(); - let action = if let Some(filter) = &tree.compaction_filter { - (filter.filter)(key, value) + if let Some(value) = Self::exec_filter(tree, key, value.into()) { + tree.tree.insert(key.into(), value.into()) } else { - CompactionAction::Keep - }; - match action { - CompactionAction::Keep => tree.tree.insert(key.into(), value.into()), - CompactionAction::Remove => tree.tree.remove(key), + tree.tree.remove(key) }; Ok(()) } @@ -129,41 +166,61 @@ impl Db { let tree = db.get_mut(column_family).unwrap(); match tree.tree.entry(key.into()) { Entry::Vacant(e) => { - let value = if let Some(merge) = &tree.merge_operator { - (merge.full)(key, None, once(value)) - } else { - value.into() - }; - let action = if let Some(filter) = &tree.compaction_filter { - (filter.filter)(key, &value) - } else { - CompactionAction::Keep - }; - match action { - CompactionAction::Keep => { - e.insert(value); - } - CompactionAction::Remove => (), + if let Some(value) = + Self::exec_filter(tree, key, Self::exec_merge(tree, key, None, value)) + { + e.insert(value); } } Entry::Occupied(mut e) => { - let value = if let Some(merge) = &tree.merge_operator { - (merge.full)(key, Some(&e.get()), once(value)) + if let Some(value) = + Self::exec_filter(tree, key, Self::exec_merge(tree, key, None, value)) + { + e.insert(value); } else { - value.into() - }; - let action = if let Some(filter) = &tree.compaction_filter { - (filter.filter)(key, &value) - } else { - CompactionAction::Keep - }; - match action { - CompactionAction::Keep => e.insert(value), - CompactionAction::Remove => e.remove(), - }; + e.remove(); + } } } Ok(()) + }*/ + + fn exec_merge( + &self, + cf: &ColumnFamily, + key: &[u8], + base: Option<&[u8]>, + value: &[u8], + ) -> Result> { + let merge = self.0.merge_operators.get(cf).ok_or_else(|| { + invalid_input_error(format!("The column family {} has no merge operator", cf.0)) + })?; + Ok((merge.full)(key, base, vec![value].into_iter())) + } + + fn exec_partial_merge( + &self, + cf: &ColumnFamily, + key: &[u8], + a: &[u8], + b: &[u8], + ) -> Result> { + let merge = self.0.merge_operators.get(cf).ok_or_else(|| { + invalid_input_error(format!("The column family {} has no merge operator", cf.0)) + })?; + Ok((merge.partial)(key, vec![a, b].into_iter())) + } + + fn exec_filter(&self, cf: &ColumnFamily, key: &[u8], value: Vec) -> Option> { + let action = if let Some(filter) = self.0.compaction_filters.get(cf) { + (filter.filter)(key, &value) + } else { + CompactionAction::Keep + }; + match action { + CompactionAction::Keep => Some(value), + CompactionAction::Remove => None, + } } pub fn iter(&self, column_family: &ColumnFamily) -> Iter { @@ -171,8 +228,15 @@ impl Db { } pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Iter { - let trees = self.0.read().unwrap(); - let tree = &trees.get(column_family).unwrap().tree; + let trees = self.0.trees.read().unwrap(); + let tree = if let Some(tree) = trees.get(column_family) { + tree + } else { + return Iter { + iter: Vec::new().into_iter(), + current: None, + }; + }; let data: Vec<_> = if prefix.is_empty() { tree.iter().map(|(k, v)| (k.clone(), v.clone())).collect() } else { @@ -189,29 +253,119 @@ impl Db { pub fn len(&self, column_family: &ColumnFamily) -> Result { Ok(self .0 + .trees .read() .unwrap() .get(column_family) - .unwrap() - .tree - .len()) + .map_or(0, |tree| tree.len())) } pub fn is_empty(&self, column_family: &ColumnFamily) -> Result { Ok(self .0 + .trees .read() .unwrap() .get(column_family) - .unwrap() - .tree - .is_empty()) + .map_or(true, |tree| tree.is_empty())) } } -#[derive(Clone, Ord, PartialOrd, Eq, PartialEq)] +#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Hash)] pub struct ColumnFamily(&'static str); +pub struct WriteBatchWithIndex { + by_cf: HashMap, + db: Db, + error: Option, +} + +#[derive(Default)] +struct WriteBatchWithIndexCF { + // Evaluation order insert/remove then merge + to_insert: HashMap, Vec>, + to_merge: HashMap, Vec>, + to_remove: HashSet>, +} + +impl WriteBatchWithIndex { + pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) { + let cf_state = self.by_cf.entry(column_family.clone()).or_default(); + cf_state.to_insert.insert(key.into(), value.into()); + cf_state.to_merge.remove(key); + cf_state.to_remove.remove(key); + } + + pub fn insert_empty(&mut self, column_family: &ColumnFamily, key: &[u8]) { + self.insert(column_family, key, &[]) + } + + pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) { + let cf_state = self.by_cf.entry(column_family.clone()).or_default(); + cf_state.to_insert.remove(key); + cf_state.to_merge.remove(key); + cf_state.to_remove.insert(key.into()); + } + + pub fn merge(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) { + let cf_state = self.by_cf.entry(column_family.clone()).or_default(); + match cf_state.to_merge.entry(key.into()) { + hash_map::Entry::Vacant(e) => { + e.insert(value.into()); + } + hash_map::Entry::Occupied(mut e) => { + match self + .db + .exec_partial_merge(column_family, key, e.get(), value) + { + Ok(value) => { + e.insert(value); + } + Err(e) => self.error = Some(e), + } + } + } + } + + pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result>> { + if let Some(cf_state) = self.by_cf.get(column_family) { + let value = if cf_state.to_remove.contains(key) { + None + } else if let Some(value) = cf_state.to_insert.get(key) { + Some(value.clone()) + } else { + self.db.get(column_family, key)? + }; + Ok(if let Some(merge) = cf_state.to_merge.get(key) { + Some( + self.db + .exec_merge(column_family, key, value.as_deref(), merge)?, + ) + } else { + value + } + .and_then(|value| self.db.exec_filter(column_family, key, value))) + } else { + self.db.get(column_family, key) + } + } + + pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result { + Ok(self.get(column_family, key)?.is_some()) //TODO: optimize + } + + pub fn clear(&mut self) { + self.by_cf.clear(); + } + + pub fn len(&self) -> usize { + self.by_cf + .values() + .map(|v| v.to_insert.len() + v.to_remove.len() + v.to_merge.len()) + .sum() + } +} + pub struct Iter { iter: std::vec::IntoIter<(Vec, Vec)>, current: Option<(Vec, Vec)>, @@ -241,4 +395,4 @@ pub struct MergeOperator { pub name: CString, } -pub type SlicesIterator<'a> = Once<&'a [u8]>; +pub type SlicesIterator<'a> = std::vec::IntoIter<&'a [u8]>; diff --git a/lib/src/storage/backend/mod.rs b/lib/src/storage/backend/mod.rs index 7bacbbe6..5e275c9e 100644 --- a/lib/src/storage/backend/mod.rs +++ b/lib/src/storage/backend/mod.rs @@ -2,9 +2,13 @@ //! RocksDB is available, if not in memory #[cfg(target_arch = "wasm32")] -pub use fallback::{ColumnFamily, Db, Iter, MergeOperator}; +pub use fallback::{ + ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, WriteBatchWithIndex, +}; #[cfg(not(target_arch = "wasm32"))] -pub use rocksdb::{ColumnFamily, Db, Iter, MergeOperator}; +pub use rocksdb::{ + ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, WriteBatchWithIndex, +}; use std::ffi::CString; #[cfg(target_arch = "wasm32")] @@ -12,14 +16,6 @@ mod fallback; #[cfg(not(target_arch = "wasm32"))] mod rocksdb; -pub struct ColumnFamilyDefinition { - pub name: &'static str, - pub merge_operator: Option, - pub compaction_filter: Option, - pub use_iter: bool, - pub min_prefix_size: usize, -} - pub struct CompactionFilter { pub filter: fn(&[u8], &[u8]) -> CompactionAction, pub name: CString, diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index 49b97b6d..3a55a273 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -5,8 +5,8 @@ #![allow(unsafe_code)] use crate::error::invalid_input_error; -use crate::storage::backend::{ColumnFamilyDefinition, CompactionAction, CompactionFilter}; -use libc::{self, c_char, c_int, c_uchar, c_void, size_t}; +use crate::storage::backend::{CompactionAction, CompactionFilter}; +use libc::{self, c_char, c_int, c_uchar, c_void, free, size_t}; use oxrocksdb_sys::*; use std::borrow::Borrow; use std::env::temp_dir; @@ -40,6 +40,14 @@ macro_rules! ffi_result_impl { }} } +pub struct ColumnFamilyDefinition { + pub name: &'static str, + pub merge_operator: Option, + pub compaction_filter: Option, + pub use_iter: bool, + pub min_prefix_size: usize, +} + #[derive(Clone)] pub struct Db(Arc); @@ -51,7 +59,6 @@ struct DbHandler { options: *mut rocksdb_options_t, read_options: *mut rocksdb_readoptions_t, write_options: *mut rocksdb_writeoptions_t, - low_priority_write_options: *mut rocksdb_writeoptions_t, flush_options: *mut rocksdb_flushoptions_t, compaction_options: *mut rocksdb_compactoptions_t, env: Option<*mut rocksdb_env_t>, @@ -73,7 +80,6 @@ impl Drop for DbHandler { } rocksdb_readoptions_destroy(self.read_options); rocksdb_writeoptions_destroy(self.write_options); - rocksdb_writeoptions_destroy(self.low_priority_write_options); rocksdb_flushoptions_destroy(self.flush_options); rocksdb_compactoptions_destroy(self.compaction_options); rocksdb_options_destroy(self.options); @@ -267,13 +273,6 @@ impl Db { rocksdb_writeoptions_disable_WAL(write_options, 1); // No need for WAL } - let low_priority_write_options = rocksdb_writeoptions_create_copy(write_options); - assert!( - !low_priority_write_options.is_null(), - "rocksdb_writeoptions_create_copy returned null" - ); - rocksdb_writeoptions_set_low_pri(low_priority_write_options, 1); - let flush_options = rocksdb_flushoptions_create(); assert!( !flush_options.is_null(), @@ -291,7 +290,6 @@ impl Db { options, read_options, write_options, - low_priority_write_options, flush_options, compaction_options, env, @@ -359,82 +357,27 @@ impl Db { Ok(self.get(column_family, key)?.is_some()) //TODO: optimize } - pub fn insert( - &self, - column_family: &ColumnFamily, - key: &[u8], - value: &[u8], - low_priority: bool, - ) -> Result<()> { + pub fn new_batch(&self) -> WriteBatchWithIndex { unsafe { - ffi_result!(rocksdb_put_cf( - self.0.db, - if low_priority { - self.0.low_priority_write_options - } else { - self.0.write_options - }, - column_family.0, - key.as_ptr() as *const c_char, - key.len(), - value.as_ptr() as *const c_char, - value.len(), - )) + let batch = rocksdb_writebatch_wi_create(0, 0); + assert!(!batch.is_null(), "rocksdb_writebatch_create returned null"); + WriteBatchWithIndex { + batch, + db: self.clone(), + } } } - pub fn merge( - &self, - column_family: &ColumnFamily, - key: &[u8], - value: &[u8], - low_priority: bool, - ) -> Result<()> { + pub fn write(&self, batch: &mut WriteBatchWithIndex) -> Result<()> { unsafe { - ffi_result!(rocksdb_merge_cf( + ffi_result!(rocksdb_write_writebatch_wi( self.0.db, - if low_priority { - self.0.low_priority_write_options - } else { - self.0.write_options - }, - column_family.0, - key.as_ptr() as *const c_char, - key.len(), - value.as_ptr() as *const c_char, - value.len(), + self.0.write_options, + batch.batch )) - } - } - - pub fn insert_empty( - &self, - column_family: &ColumnFamily, - key: &[u8], - low_priority: bool, - ) -> Result<()> { - self.insert(column_family, key, &[], low_priority) - } - - pub fn remove( - &self, - column_family: &ColumnFamily, - key: &[u8], - low_priority: bool, - ) -> Result<()> { - unsafe { - ffi_result!(rocksdb_delete_cf( - self.0.db, - if low_priority { - self.0.low_priority_write_options - } else { - self.0.write_options - }, - column_family.0, - key.as_ptr() as *const c_char, - key.len() - )) - } + }?; + batch.clear(); + Ok(()) } pub fn iter(&self, column_family: &ColumnFamily) -> Iter { @@ -517,6 +460,96 @@ pub struct ColumnFamily(*mut rocksdb_column_family_handle_t); unsafe impl Send for ColumnFamily {} unsafe impl Sync for ColumnFamily {} +pub struct WriteBatchWithIndex { + batch: *mut rocksdb_writebatch_wi_t, + db: Db, +} + +impl Drop for WriteBatchWithIndex { + fn drop(&mut self) { + unsafe { rocksdb_writebatch_wi_destroy(self.batch) } + } +} + +impl WriteBatchWithIndex { + pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) { + unsafe { + rocksdb_writebatch_wi_put_cf( + self.batch, + column_family.0, + key.as_ptr() as *const c_char, + key.len(), + value.as_ptr() as *const c_char, + value.len(), + ) + } + } + + pub fn insert_empty(&mut self, column_family: &ColumnFamily, key: &[u8]) { + self.insert(column_family, key, &[]) + } + + pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) { + unsafe { + rocksdb_writebatch_wi_delete_cf( + self.batch, + column_family.0, + key.as_ptr() as *const c_char, + key.len(), + ) + } + } + + pub fn merge(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) { + unsafe { + rocksdb_writebatch_wi_merge_cf( + self.batch, + column_family.0, + key.as_ptr() as *const c_char, + key.len(), + value.as_ptr() as *const c_char, + value.len(), + ) + } + } + + pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result> { + unsafe { + let mut len = 0; + let base = ffi_result!(rocksdb_writebatch_wi_get_from_batch_and_db_cf( + self.batch, + self.db.0.db, + self.db.0.read_options, + column_family.0, + key.as_ptr() as *const c_char, + key.len(), + &mut len + ))? as *mut u8; + Ok(if base.is_null() { + None + } else { + Some(Buffer { base, len }) + }) + } + } + + pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result { + Ok(self.get(column_family, key)?.is_some()) //TODO: optimize + } + + pub fn clear(&mut self) { + unsafe { + rocksdb_writebatch_wi_clear(self.batch); + } + } + + pub fn len(&self) -> usize { + unsafe { rocksdb_writebatch_wi_count(self.batch) } + .try_into() + .unwrap() + } +} + pub struct PinnableSlice(*mut rocksdb_pinnableslice_t); impl Drop for PinnableSlice { @@ -551,6 +584,39 @@ impl Borrow<[u8]> for PinnableSlice { } } +pub struct Buffer { + base: *mut u8, + len: usize, +} + +impl Drop for Buffer { + fn drop(&mut self) { + unsafe { + free(self.base as *mut c_void); + } + } +} + +impl Deref for Buffer { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.base, self.len) } + } +} + +impl AsRef<[u8]> for Buffer { + fn as_ref(&self) -> &[u8] { + &*self + } +} + +impl Borrow<[u8]> for Buffer { + fn borrow(&self) -> &[u8] { + &*self + } +} + pub struct Iter { iter: *mut rocksdb_iterator_t, is_currently_valid: bool, @@ -615,7 +681,7 @@ impl Iter { fn convert_error(ptr: *const c_char) -> Error { let message = unsafe { let s = CStr::from_ptr(ptr).to_string_lossy().into_owned(); - libc::free(ptr as *mut c_void); + free(ptr as *mut c_void); s }; other_error(message) diff --git a/lib/src/storage/binary_encoder.rs b/lib/src/storage/binary_encoder.rs index 81a4aa05..3edd6c3b 100644 --- a/lib/src/storage/binary_encoder.rs +++ b/lib/src/storage/binary_encoder.rs @@ -666,12 +666,7 @@ mod tests { impl MemoryStrStore { fn insert_term(&self, term: TermRef<'_>, encoded: &EncodedTerm) { - insert_term(term, encoded, |h, v| { - self.insert_str(h, v); - let r: Result<(), Infallible> = Ok(()); - r - }) - .unwrap(); + insert_term(term, encoded, &mut |h, v| self.insert_str(h, v)) } fn insert_str(&self, key: &StrHash, value: &str) { diff --git a/lib/src/storage/io.rs b/lib/src/storage/io.rs index 1ef4c8b0..ae6edc79 100644 --- a/lib/src/storage/io.rs +++ b/lib/src/storage/io.rs @@ -5,43 +5,33 @@ use crate::io::{ DatasetFormat, DatasetParser, DatasetSerializer, GraphFormat, GraphParser, GraphSerializer, }; use crate::model::{GraphNameRef, Quad, Triple}; -use crate::storage::StorageLike; -use std::io; -use std::io::{BufRead, Write}; +use crate::storage::StorageWriter; +use std::io::{BufRead, Result, Write}; -pub fn load_graph( - store: &S, +pub fn load_graph( + writer: &mut StorageWriter, reader: impl BufRead, format: GraphFormat, to_graph_name: GraphNameRef<'_>, base_iri: Option<&str>, -) -> Result<(), StoreOrParseError> { +) -> Result<()> { let mut parser = GraphParser::from_format(format); if let Some(base_iri) = base_iri { parser = parser .with_base_iri(base_iri) - .map_err(|e| StoreOrParseError::Parse(invalid_input_error(e)))?; + .map_err(invalid_input_error)?; } - for t in parser - .read_triples(reader) - .map_err(StoreOrParseError::Parse)? - { - store - .insert( - t.map_err(StoreOrParseError::Parse)? - .as_ref() - .in_graph(to_graph_name), - ) - .map_err(StoreOrParseError::Store)?; + for t in parser.read_triples(reader)? { + writer.insert(t?.as_ref().in_graph(to_graph_name))?; } Ok(()) } pub fn dump_graph( - triples: impl Iterator>, + triples: impl Iterator>, writer: impl Write, format: GraphFormat, -) -> io::Result<()> { +) -> Result<()> { let mut writer = GraphSerializer::from_format(format).triple_writer(writer)?; for triple in triples { writer.write(&triple?)?; @@ -49,50 +39,32 @@ pub fn dump_graph( writer.finish() } -pub fn load_dataset( - store: &S, +pub fn load_dataset( + writer: &mut StorageWriter, reader: impl BufRead, format: DatasetFormat, base_iri: Option<&str>, -) -> Result<(), StoreOrParseError> { +) -> Result<()> { let mut parser = DatasetParser::from_format(format); if let Some(base_iri) = base_iri { parser = parser .with_base_iri(base_iri) - .map_err(|e| StoreOrParseError::Parse(invalid_input_error(e)))?; + .map_err(invalid_input_error)?; } - for t in parser - .read_quads(reader) - .map_err(StoreOrParseError::Parse)? - { - store - .insert(t.map_err(StoreOrParseError::Parse)?.as_ref()) - .map_err(StoreOrParseError::Store)?; + for t in parser.read_quads(reader)? { + writer.insert(t?.as_ref())?; } Ok(()) } pub fn dump_dataset( - quads: impl Iterator>, + quads: impl Iterator>, writer: impl Write, format: DatasetFormat, -) -> io::Result<()> { +) -> Result<()> { let mut writer = DatasetSerializer::from_format(format).quad_writer(writer)?; for quad in quads { writer.write(&quad?)?; } writer.finish() } - -pub enum StoreOrParseError { - Store(S), - Parse(io::Error), -} - -impl From> for io::Error { - fn from(error: StoreOrParseError) -> Self { - match error { - StoreOrParseError::Store(error) | StoreOrParseError::Parse(error) => error, - } - } -} diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index c73fd06d..6fe575a4 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -11,7 +11,7 @@ use crate::storage::numeric_encoder::{ }; use backend::{ ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter, - MergeOperator, + MergeOperator, WriteBatchWithIndex, }; use std::ffi::CString; use std::io::Result; @@ -36,6 +36,7 @@ const DPOS_CF: &str = "dpos"; const DOSP_CF: &str = "dosp"; const GRAPHS_CF: &str = "graphs"; const DEFAULT_CF: &str = "default"; +const AUTO_WRITE_BATCH_THRESHOLD: usize = 1024; /// Low level storage primitives #[derive(Clone)] @@ -211,34 +212,42 @@ impl Storage { let mut version = this.ensure_version()?; if version == 0 { + let mut batch = this.db.new_batch(); // We migrate to v1 for quad in this.quads() { let quad = quad?; if !quad.graph_name.is_default_graph() { - this.db - .insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name), false)?; + batch.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name)); + if batch.len() >= AUTO_WRITE_BATCH_THRESHOLD { + this.db.write(&mut batch)?; + } } } + this.db.write(&mut batch)?; this.db.flush(&this.graphs_cf)?; version = 1; - this.set_version(version)?; - this.db.flush(&this.default_cf)?; + this.update_version(version)?; } if version == 1 { // We migrate to v2 + let mut batch = this.db.new_batch(); let mut iter = this.db.iter(&this.id2str_cf); while let (Some(key), Some(value)) = (iter.key(), iter.value()) { let mut new_value = Vec::with_capacity(value.len() + 4); new_value.extend_from_slice(&i32::MAX.to_be_bytes()); new_value.extend_from_slice(value); - this.db.insert(&this.id2str_cf, key, &new_value, false)?; + batch.insert(&this.id2str_cf, key, &new_value); iter.next(); + if batch.len() >= AUTO_WRITE_BATCH_THRESHOLD { + this.db.write(&mut batch)?; + batch.clear(); + } } + this.db.write(&mut batch)?; iter.status()?; this.db.flush(&this.id2str_cf)?; version = 2; - this.set_version(version)?; - this.db.flush(&this.default_cf)?; + this.update_version(version)?; } match version { @@ -261,20 +270,17 @@ impl Storage { buffer.copy_from_slice(&version); u64::from_be_bytes(buffer) } else { - self.set_version(LATEST_STORAGE_VERSION)?; + self.update_version(LATEST_STORAGE_VERSION)?; LATEST_STORAGE_VERSION }, ) } - fn set_version(&self, version: u64) -> Result<()> { - self.db.insert( - &self.default_cf, - b"oxversion", - &version.to_be_bytes(), - false, - )?; - Ok(()) + fn update_version(&self, version: u64) -> Result<()> { + let mut batch = self.db.new_batch(); + batch.insert(&self.default_cf, b"oxversion", &version.to_be_bytes()); + self.db.write(&mut batch)?; + self.db.flush(&self.default_cf) } pub fn len(&self) -> Result { @@ -589,241 +595,22 @@ impl Storage { } } - pub fn insert(&self, quad: QuadRef<'_>) -> Result { - let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); - let encoded = quad.into(); - - Ok(if quad.graph_name.is_default_graph() { - write_spo_quad(&mut buffer, &encoded); - if self.db.contains_key(&self.dspo_cf, buffer.as_slice())? { - false - } else { - self.insert_quad_triple(quad, &encoded)?; - - self.db - .insert_empty(&self.dspo_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_pos_quad(&mut buffer, &encoded); - self.db - .insert_empty(&self.dpos_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_osp_quad(&mut buffer, &encoded); - self.db - .insert_empty(&self.dosp_cf, buffer.as_slice(), false)?; - buffer.clear(); - - true - } - } else { - write_spog_quad(&mut buffer, &encoded); - if self.db.contains_key(&self.spog_cf, buffer.as_slice())? { - false - } else { - self.insert_quad_triple(quad, &encoded)?; - - self.db - .insert_empty(&self.spog_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_posg_quad(&mut buffer, &encoded); - self.db - .insert_empty(&self.posg_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_ospg_quad(&mut buffer, &encoded); - self.db - .insert_empty(&self.ospg_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_gspo_quad(&mut buffer, &encoded); - self.db - .insert_empty(&self.gspo_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_gpos_quad(&mut buffer, &encoded); - self.db - .insert_empty(&self.gpos_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_gosp_quad(&mut buffer, &encoded); - self.db - .insert_empty(&self.gosp_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_term(&mut buffer, &encoded.graph_name); - if !self.db.contains_key(&self.graphs_cf, &buffer)? { - self.db.insert_empty(&self.graphs_cf, &buffer, false)?; - self.insert_graph_name(quad.graph_name, &encoded.graph_name)?; - } - buffer.clear(); - - true - } - }) - } - - pub fn remove(&self, quad: QuadRef<'_>) -> Result { - self.remove_encoded(&quad.into()) - } - - fn remove_encoded(&self, quad: &EncodedQuad) -> Result { - let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1); - - Ok(if quad.graph_name.is_default_graph() { - write_spo_quad(&mut buffer, quad); - - if self.db.contains_key(&self.dspo_cf, buffer.as_slice())? { - self.db.remove(&self.dspo_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_pos_quad(&mut buffer, quad); - self.db.remove(&self.dpos_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_osp_quad(&mut buffer, quad); - self.db.remove(&self.dosp_cf, buffer.as_slice(), false)?; - buffer.clear(); - - self.remove_quad_triple(quad)?; - - true - } else { - false - } - } else { - write_spog_quad(&mut buffer, quad); - - if self.db.contains_key(&self.spog_cf, buffer.as_slice())? { - self.db.remove(&self.spog_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_posg_quad(&mut buffer, quad); - self.db.remove(&self.posg_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_ospg_quad(&mut buffer, quad); - self.db.remove(&self.ospg_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_gspo_quad(&mut buffer, quad); - self.db.remove(&self.gspo_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_gpos_quad(&mut buffer, quad); - self.db.remove(&self.gpos_cf, buffer.as_slice(), false)?; - buffer.clear(); - - write_gosp_quad(&mut buffer, quad); - self.db.remove(&self.gosp_cf, buffer.as_slice(), false)?; - buffer.clear(); - - self.remove_quad_triple(quad)?; - - true - } else { - false - } - }) - } - - pub fn insert_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> Result { - let encoded_graph_name = graph_name.into(); - let encoded = encode_term(&encoded_graph_name); - Ok(if self.db.contains_key(&self.graphs_cf, &encoded)? { - false - } else { - self.db.insert_empty(&self.graphs_cf, &encoded, false)?; - self.insert_term(graph_name.into(), &encoded_graph_name)?; - true - }) - } - - pub fn clear_graph(&self, graph_name: GraphNameRef<'_>) -> Result<()> { - for quad in self.quads_for_graph(&graph_name.into()) { - self.remove_encoded(&quad?)?; + pub fn atomic_writer(&self) -> StorageWriter { + StorageWriter { + buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE), + batch: self.db.new_batch(), + storage: self.clone(), + auto_commit: false, } - Ok(()) } - pub fn clear_all_named_graphs(&self) -> Result<()> { - for quad in self.quads_in_named_graph() { - self.remove_encoded(&quad?)?; + pub fn simple_writer(&self) -> StorageWriter { + StorageWriter { + buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE), + batch: self.db.new_batch(), + storage: self.clone(), + auto_commit: true, } - Ok(()) - } - - pub fn clear_all_graphs(&self) -> Result<()> { - for quad in self.quads() { - self.remove_encoded(&quad?)?; - } - Ok(()) - } - - pub fn remove_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> Result { - self.remove_encoded_named_graph(&graph_name.into()) - } - - fn remove_encoded_named_graph(&self, graph_name: &EncodedTerm) -> Result { - for quad in self.quads_for_graph(graph_name) { - self.remove_encoded(&quad?)?; - } - let encoded_graph = encode_term(graph_name); - Ok(if self.db.contains_key(&self.graphs_cf, &encoded_graph)? { - self.db.remove(&self.graphs_cf, &encoded_graph, false)?; - self.remove_term(graph_name)?; - true - } else { - false - }) - } - - pub fn remove_all_named_graphs(&self) -> Result<()> { - for graph_name in self.named_graphs() { - self.remove_encoded_named_graph(&graph_name?)?; - } - Ok(()) - } - - pub fn clear(&self) -> Result<()> { - for graph_name in self.named_graphs() { - self.remove_encoded_named_graph(&graph_name?)?; - } - for quad in self.quads() { - self.remove_encoded(&quad?)?; - } - Ok(()) - } - - fn insert_term(&self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<()> { - insert_term(term, encoded, |key, value| self.insert_str(key, value)) - } - - fn insert_graph_name(&self, graph_name: GraphNameRef<'_>, encoded: &EncodedTerm) -> Result<()> { - match graph_name { - GraphNameRef::NamedNode(graph_name) => self.insert_term(graph_name.into(), encoded), - GraphNameRef::BlankNode(graph_name) => self.insert_term(graph_name.into(), encoded), - GraphNameRef::DefaultGraph => Ok(()), - } - } - - fn insert_quad_triple(&self, quad: QuadRef<'_>, encoded: &EncodedQuad) -> Result<()> { - self.insert_term(quad.subject.into(), &encoded.subject)?; - self.insert_term(quad.predicate.into(), &encoded.predicate)?; - self.insert_term(quad.object, &encoded.object)?; - Ok(()) - } - - fn remove_term(&self, encoded: &EncodedTerm) -> Result<()> { - remove_term(encoded, |key| self.remove_str(key)) - } - - fn remove_quad_triple(&self, encoded: &EncodedQuad) -> Result<()> { - self.remove_term(&encoded.subject)?; - self.remove_term(&encoded.predicate)?; - self.remove_term(&encoded.object)?; - Ok(()) } #[cfg(not(target_arch = "wasm32"))] @@ -879,23 +666,6 @@ impl Storage { i32::from_be_bytes(v[..4].try_into().unwrap()) > 0 })) } - - fn insert_str(&self, key: &StrHash, value: &str) -> Result<()> { - let mut buffer = Vec::with_capacity(value.len() + 4); - buffer.extend_from_slice(&1_i32.to_be_bytes()); - buffer.extend_from_slice(value.as_bytes()); - self.db - .merge(&self.id2str_cf, &key.to_be_bytes(), &buffer, false) - } - - fn remove_str(&self, key: &StrHash) -> Result<()> { - self.db.merge( - &self.id2str_cf, - &key.to_be_bytes(), - &(-1_i32).to_be_bytes(), - true, - ) - } } pub struct ChainedDecodingQuadIterator { @@ -968,12 +738,6 @@ impl Iterator for DecodingGraphIterator { } } -pub trait StorageLike: StrLookup { - fn insert(&self, quad: QuadRef<'_>) -> Result; - - fn remove(&self, quad: QuadRef<'_>) -> Result; -} - impl StrLookup for Storage { type Error = std::io::Error; @@ -986,13 +750,286 @@ impl StrLookup for Storage { } } -impl StorageLike for Storage { - fn insert(&self, quad: QuadRef<'_>) -> Result { - self.insert(quad) +pub struct StorageWriter { + buffer: Vec, + batch: WriteBatchWithIndex, + storage: Storage, + auto_commit: bool, +} + +impl StorageWriter { + pub fn insert(&mut self, quad: QuadRef<'_>) -> Result { + let encoded = quad.into(); + self.buffer.clear(); + let result = if quad.graph_name.is_default_graph() { + write_spo_quad(&mut self.buffer, &encoded); + if self + .batch + .contains_key(&self.storage.dspo_cf, &self.buffer)? + { + false + } else { + self.batch.insert_empty(&self.storage.dspo_cf, &self.buffer); + + self.buffer.clear(); + write_pos_quad(&mut self.buffer, &encoded); + self.batch.insert_empty(&self.storage.dpos_cf, &self.buffer); + + self.buffer.clear(); + write_osp_quad(&mut self.buffer, &encoded); + self.batch.insert_empty(&self.storage.dosp_cf, &self.buffer); + + self.insert_term(quad.subject.into(), &encoded.subject); + self.insert_term(quad.predicate.into(), &encoded.predicate); + self.insert_term(quad.object, &encoded.object); + true + } + } else { + write_spog_quad(&mut self.buffer, &encoded); + if self + .batch + .contains_key(&self.storage.spog_cf, &self.buffer)? + { + false + } else { + self.batch.insert_empty(&self.storage.spog_cf, &self.buffer); + + self.buffer.clear(); + write_posg_quad(&mut self.buffer, &encoded); + self.batch.insert_empty(&self.storage.posg_cf, &self.buffer); + + self.buffer.clear(); + write_ospg_quad(&mut self.buffer, &encoded); + self.batch.insert_empty(&self.storage.ospg_cf, &self.buffer); + + self.buffer.clear(); + write_gspo_quad(&mut self.buffer, &encoded); + self.batch.insert_empty(&self.storage.gspo_cf, &self.buffer); + + self.buffer.clear(); + write_gpos_quad(&mut self.buffer, &encoded); + self.batch.insert_empty(&self.storage.gpos_cf, &self.buffer); + + self.buffer.clear(); + write_gosp_quad(&mut self.buffer, &encoded); + self.batch.insert_empty(&self.storage.gosp_cf, &self.buffer); + + self.insert_term(quad.subject.into(), &encoded.subject); + self.insert_term(quad.predicate.into(), &encoded.predicate); + self.insert_term(quad.object, &encoded.object); + + self.buffer.clear(); + write_term(&mut self.buffer, &encoded.graph_name); + if !self + .batch + .contains_key(&self.storage.graphs_cf, &self.buffer)? + { + self.batch + .insert_empty(&self.storage.graphs_cf, &self.buffer); + self.insert_graph_name(quad.graph_name, &encoded.graph_name); + } + true + } + }; + self.write_if_needed()?; + Ok(result) + } + + pub fn insert_named_graph(&mut self, graph_name: NamedOrBlankNodeRef<'_>) -> Result { + let encoded_graph_name = graph_name.into(); + + self.buffer.clear(); + write_term(&mut self.buffer, &encoded_graph_name); + let result = if self + .batch + .contains_key(&self.storage.graphs_cf, &self.buffer)? + { + false + } else { + self.batch + .insert_empty(&self.storage.graphs_cf, &self.buffer); + self.insert_term(graph_name.into(), &encoded_graph_name); + true + }; + self.write_if_needed()?; + Ok(result) + } + + fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) { + insert_term(term, encoded, &mut |key, value| self.insert_str(key, value)) + } + + fn insert_graph_name(&mut self, graph_name: GraphNameRef<'_>, encoded: &EncodedTerm) { + match graph_name { + GraphNameRef::NamedNode(graph_name) => self.insert_term(graph_name.into(), encoded), + GraphNameRef::BlankNode(graph_name) => self.insert_term(graph_name.into(), encoded), + GraphNameRef::DefaultGraph => (), + } + } + + fn insert_str(&mut self, key: &StrHash, value: &str) { + self.buffer.clear(); + self.buffer.extend_from_slice(&1_i32.to_be_bytes()); + self.buffer.extend_from_slice(value.as_bytes()); + self.batch + .merge(&self.storage.id2str_cf, &key.to_be_bytes(), &self.buffer); + } + + pub fn remove(&mut self, quad: QuadRef<'_>) -> Result { + self.remove_encoded(&quad.into()) + } + + fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result { + self.buffer.clear(); + let result = if quad.graph_name.is_default_graph() { + write_spo_quad(&mut self.buffer, quad); + + if self + .batch + .contains_key(&self.storage.dspo_cf, &self.buffer)? + { + self.batch.remove(&self.storage.dspo_cf, &self.buffer); + + self.buffer.clear(); + write_pos_quad(&mut self.buffer, quad); + self.batch.remove(&self.storage.dpos_cf, &self.buffer); + + self.buffer.clear(); + write_osp_quad(&mut self.buffer, quad); + self.batch.remove(&self.storage.dosp_cf, &self.buffer); + + self.remove_term(&quad.subject); + self.remove_term(&quad.predicate); + self.remove_term(&quad.object); + true + } else { + false + } + } else { + write_spog_quad(&mut self.buffer, quad); + + if self + .batch + .contains_key(&self.storage.spog_cf, &self.buffer)? + { + self.batch.remove(&self.storage.spog_cf, &self.buffer); + + self.buffer.clear(); + write_posg_quad(&mut self.buffer, quad); + self.batch.remove(&self.storage.posg_cf, &self.buffer); + + self.buffer.clear(); + write_ospg_quad(&mut self.buffer, quad); + self.batch.remove(&self.storage.ospg_cf, &self.buffer); + + self.buffer.clear(); + write_gspo_quad(&mut self.buffer, quad); + self.batch.remove(&self.storage.gspo_cf, &self.buffer); + + self.buffer.clear(); + write_gpos_quad(&mut self.buffer, quad); + self.batch.remove(&self.storage.gpos_cf, &self.buffer); + + self.buffer.clear(); + write_gosp_quad(&mut self.buffer, quad); + self.batch.remove(&self.storage.gosp_cf, &self.buffer); + + self.remove_term(&quad.subject); + self.remove_term(&quad.predicate); + self.remove_term(&quad.object); + true + } else { + false + } + }; + self.write_if_needed()?; + Ok(result) } - fn remove(&self, quad: QuadRef<'_>) -> Result { - self.remove(quad) + pub fn clear_graph(&mut self, graph_name: GraphNameRef<'_>) -> Result<()> { + for quad in self.storage.quads_for_graph(&graph_name.into()) { + self.remove_encoded(&quad?)?; + } + Ok(()) + } + + pub fn clear_all_named_graphs(&mut self) -> Result<()> { + for quad in self.storage.quads_in_named_graph() { + self.remove_encoded(&quad?)?; + } + Ok(()) + } + + pub fn clear_all_graphs(&mut self) -> Result<()> { + for quad in self.storage.quads() { + self.remove_encoded(&quad?)?; + } + Ok(()) + } + + pub fn remove_named_graph(&mut self, graph_name: NamedOrBlankNodeRef<'_>) -> Result { + self.remove_encoded_named_graph(&graph_name.into()) + } + + fn remove_encoded_named_graph(&mut self, graph_name: &EncodedTerm) -> Result { + for quad in self.storage.quads_for_graph(graph_name) { + self.remove_encoded(&quad?)?; + } + self.buffer.clear(); + write_term(&mut self.buffer, graph_name); + let result = if self + .batch + .contains_key(&self.storage.graphs_cf, &self.buffer)? + { + self.batch.remove(&self.storage.graphs_cf, &self.buffer); + self.remove_term(graph_name); + true + } else { + false + }; + self.write_if_needed()?; + Ok(result) + } + + pub fn remove_all_named_graphs(&mut self) -> Result<()> { + for graph_name in self.storage.named_graphs() { + self.remove_encoded_named_graph(&graph_name?)?; + } + Ok(()) + } + + pub fn clear(&mut self) -> Result<()> { + for graph_name in self.storage.named_graphs() { + self.remove_encoded_named_graph(&graph_name?)?; + } + for quad in self.storage.quads() { + self.remove_encoded(&quad?)?; + } + Ok(()) + } + + fn remove_term(&mut self, encoded: &EncodedTerm) { + remove_term(encoded, &mut |key| self.remove_str(key)); + } + + fn remove_str(&mut self, key: &StrHash) { + self.batch.merge( + &self.storage.id2str_cf, + &key.to_be_bytes(), + &(-1_i32).to_be_bytes(), + ) + } + + fn write_if_needed(&mut self) -> Result<()> { + if self.auto_commit && self.batch.len() > AUTO_WRITE_BATCH_THRESHOLD { + self.commit()?; + } + Ok(()) + } + + pub fn commit(&mut self) -> Result<()> { + self.storage.db.write(&mut self.batch)?; + Ok(()) } } @@ -1017,11 +1054,11 @@ mod tests { ); let storage = Storage::new()?; - storage.insert(quad)?; - storage.insert(quad2)?; - storage.remove(quad2)?; - storage.flush()?; - storage.db.compact(&storage.id2str_cf)?; + let mut writer = storage.atomic_writer(); + writer.insert(quad)?; + writer.insert(quad2)?; + writer.remove(quad2)?; + writer.commit()?; assert!(storage .get_str(&StrHash::new("http://example.com/s"))? .is_some()); @@ -1031,7 +1068,8 @@ mod tests { assert!(storage .get_str(&StrHash::new("http://example.com/o2"))? .is_none()); - storage.clear_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?; + writer.clear_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?; + writer.commit()?; assert!(storage .get_str(&StrHash::new("http://example.com/s"))? .is_none()); @@ -1044,7 +1082,8 @@ mod tests { assert!(storage .get_str(&StrHash::new("http://example.com/g"))? .is_some()); - storage.remove_named_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?; + writer.remove_named_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?; + writer.commit()?; assert!(storage .get_str(&StrHash::new("http://example.com/g"))? .is_none()); diff --git a/lib/src/storage/numeric_encoder.rs b/lib/src/storage/numeric_encoder.rs index 89dfc502..dd7e1688 100644 --- a/lib/src/storage/numeric_encoder.rs +++ b/lib/src/storage/numeric_encoder.rs @@ -665,11 +665,11 @@ pub trait StrLookup { fn contains_str(&self, key: &StrHash) -> Result; } -pub fn insert_term Result<(), E> + Copy>( +pub fn insert_term( term: TermRef<'_>, encoded: &EncodedTerm, - insert_str: F, -) -> Result<(), E> { + insert_str: &mut F, +) { match term { TermRef::NamedNode(node) => { if let EncodedTerm::NamedNode { iri_id } = encoded { @@ -680,7 +680,7 @@ pub fn insert_term Result<(), E> + Copy>( } TermRef::BlankNode(node) => match encoded { EncodedTerm::BigBlankNode { id_id } => insert_str(id_id, node.as_str()), - EncodedTerm::SmallBlankNode(..) | EncodedTerm::NumericalBlankNode { .. } => Ok(()), + EncodedTerm::SmallBlankNode(..) | EncodedTerm::NumericalBlankNode { .. } => (), _ => unreachable!("Invalid term encoding {:?} for {}", encoded, term), }, TermRef::Literal(literal) => match encoded { @@ -699,7 +699,7 @@ pub fn insert_term Result<(), E> + Copy>( value_id, language_id, } => { - insert_str(value_id, literal.value())?; + insert_str(value_id, literal.value()); if let Some(language) = literal.language() { insert_str(language_id, language) } else { @@ -713,7 +713,7 @@ pub fn insert_term Result<(), E> + Copy>( value_id, datatype_id, } => { - insert_str(value_id, literal.value())?; + insert_str(value_id, literal.value()); insert_str(datatype_id, literal.datatype().as_str()) } EncodedTerm::SmallStringLiteral(..) @@ -733,17 +733,17 @@ pub fn insert_term Result<(), E> + Copy>( | EncodedTerm::GMonthLiteral(..) | EncodedTerm::DurationLiteral(..) | EncodedTerm::YearMonthDurationLiteral(..) - | EncodedTerm::DayTimeDurationLiteral(..) => Ok(()), + | EncodedTerm::DayTimeDurationLiteral(..) => (), _ => unreachable!("Invalid term encoding {:?} for {}", encoded, term), }, TermRef::Triple(triple) => { if let EncodedTerm::Triple(encoded) = encoded { - insert_term(triple.subject.as_ref().into(), &encoded.subject, insert_str)?; + insert_term(triple.subject.as_ref().into(), &encoded.subject, insert_str); insert_term( triple.predicate.as_ref().into(), &encoded.predicate, insert_str, - )?; + ); insert_term(triple.object.as_ref(), &encoded.object, insert_str) } else { unreachable!("Invalid term encoding {:?} for {}", encoded, term) @@ -752,10 +752,7 @@ pub fn insert_term Result<(), E> + Copy>( } } -pub fn remove_term Result<(), E> + Copy>( - encoded: &EncodedTerm, - remove_str: F, -) -> Result<(), E> { +pub fn remove_term(encoded: &EncodedTerm, remove_str: &mut F) { match encoded { EncodedTerm::NamedNode { iri_id } => remove_str(iri_id), EncodedTerm::BigBlankNode { id_id } => remove_str(id_id), @@ -766,7 +763,7 @@ pub fn remove_term Result<(), E> + Copy>( value_id, language_id, } => { - remove_str(value_id)?; + remove_str(value_id); remove_str(language_id) } EncodedTerm::SmallTypedLiteral { datatype_id, .. } => remove_str(datatype_id), @@ -774,15 +771,15 @@ pub fn remove_term Result<(), E> + Copy>( value_id, datatype_id, } => { - remove_str(value_id)?; + remove_str(value_id); remove_str(datatype_id) } EncodedTerm::Triple(encoded) => { - remove_term(&encoded.subject, remove_str)?; - remove_term(&encoded.predicate, remove_str)?; + remove_term(&encoded.subject, remove_str); + remove_term(&encoded.predicate, remove_str); remove_term(&encoded.object, remove_str) } - _ => Ok(()), + _ => (), } } diff --git a/lib/src/store.rs b/lib/src/store.rs index dc343967..986d068b 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -230,10 +230,8 @@ impl Store { /// Loads a graph file (i.e. triples) into the store /// - /// Warning: This functions saves the triples in a not atomic way. - /// If the parsing fails in the middle of the file only a part of it may be written to the store. - /// It might leave the store in a bad state if a crash happens during a triple insertion. - /// Use a (memory greedy) [transaction](Store::transaction()) if you do not want that. + /// Warning: This function is not atomic. If an error happens during the file parsing, only a + /// part of the file might be written to the store. /// /// Usage example: /// ``` @@ -263,22 +261,15 @@ impl Store { to_graph_name: impl Into>, base_iri: Option<&str>, ) -> io::Result<()> { - load_graph( - &self.storage, - reader, - format, - to_graph_name.into(), - base_iri, - )?; - Ok(()) + let mut writer = self.storage.simple_writer(); + load_graph(&mut writer, reader, format, to_graph_name.into(), base_iri)?; + writer.commit() } /// Loads a dataset file (i.e. quads) into the store. /// - /// Warning: This functions saves the triples in a not atomic way. - /// If the parsing fails in the middle of the file, only a part of it may be written to the store. - /// It might leave the store in a bad state if a crash happens during a quad insertion. - /// Use a (memory greedy) [transaction](Store::transaction()) if you do not want that. + /// Warning: This function is not atomic. If an error happens during the file parsing, only a + /// part of the file might be written to the store. /// /// Usage example: /// ``` @@ -307,18 +298,15 @@ impl Store { format: DatasetFormat, base_iri: Option<&str>, ) -> io::Result<()> { - load_dataset(&self.storage, reader, format, base_iri)?; - Ok(()) + let mut writer = self.storage.simple_writer(); + load_dataset(&mut writer, reader, format, base_iri)?; + writer.commit() } /// Adds a quad to this store. /// /// Returns `true` if the quad was not already in the store. /// - /// This method is optimized for performances and is not atomic. - /// It might leave the store in a bad state if a crash happens during the insertion. - /// Use a (memory greedy) [transaction](Store::transaction()) if you do not want that. - /// /// Usage example: /// ``` /// use oxigraph::store::Store; @@ -334,17 +322,16 @@ impl Store { /// # Result::<_,Box>::Ok(()) /// ``` pub fn insert<'a>(&self, quad: impl Into>) -> io::Result { - self.storage.insert(quad.into()) + let mut writer = self.storage.atomic_writer(); + let result = writer.insert(quad.into())?; + writer.commit()?; + Ok(result) } /// Removes a quad from this store. /// /// Returns `true` if the quad was in the store and has been removed. /// - /// This method is optimized for performances and is not atomic. - /// It might leave the store in a bad state if a crash happens during the removal. - /// Use a (memory greedy) [transaction](Store::transaction()) if you do not want that. - /// /// Usage example: /// ``` /// use oxigraph::store::Store; @@ -361,7 +348,10 @@ impl Store { /// # Result::<_,Box>::Ok(()) /// ``` pub fn remove<'a>(&self, quad: impl Into>) -> io::Result { - self.storage.remove(quad.into()) + let mut writer = self.storage.atomic_writer(); + let result = writer.remove(quad.into())?; + writer.commit()?; + Ok(result) } /// Dumps a store graph into a file. @@ -478,7 +468,10 @@ impl Store { &self, graph_name: impl Into>, ) -> io::Result { - self.storage.insert_named_graph(graph_name.into()) + let mut writer = self.storage.atomic_writer(); + let result = writer.insert_named_graph(graph_name.into())?; + writer.commit()?; + Ok(result) } /// Clears a graph from this store. @@ -500,7 +493,9 @@ impl Store { /// # Result::<_,Box>::Ok(()) /// ``` pub fn clear_graph<'a>(&self, graph_name: impl Into>) -> io::Result<()> { - self.storage.clear_graph(graph_name.into()) + let mut writer = self.storage.simple_writer(); + writer.clear_graph(graph_name.into())?; + writer.commit() } /// Removes a graph from this store. @@ -527,7 +522,10 @@ impl Store { &self, graph_name: impl Into>, ) -> io::Result { - self.storage.remove_named_graph(graph_name.into()) + let mut writer = self.storage.simple_writer(); + let result = writer.remove_named_graph(graph_name.into())?; + writer.commit()?; + Ok(result) } /// Clears the store. @@ -548,7 +546,9 @@ impl Store { /// # Result::<_,Box>::Ok(()) /// ``` pub fn clear(&self) -> io::Result<()> { - self.storage.clear() + let mut writer = self.storage.simple_writer(); + writer.clear()?; + writer.commit() } /// Flushes all buffers and ensures that all writes are saved on disk. @@ -599,14 +599,19 @@ impl Store { /// Errors related to parameter validation like the base IRI use the [`InvalidInput`](std::io::ErrorKind::InvalidInput) error kind. /// Errors related to a bad syntax in the loaded file use the [`InvalidData`](std::io::ErrorKind::InvalidData) or [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) error kinds. /// Errors related to data loading into the store use the other error kinds. + #[cfg(not(target_arch = "wasm32"))] pub fn create_from_dataset( path: &Path, reader: impl BufRead, format: DatasetFormat, base_iri: Option<&str>, ) -> io::Result<()> { - let storage = Storage::open(path.as_ref(), false)?; - load_dataset(&storage, reader, format, base_iri)?; + let storage = Storage::open(path, false)?; + { + let mut writer = storage.simple_writer(); + load_dataset(&mut writer, reader, format, base_iri)?; + writer.commit()?; + } storage.flush()?; storage.compact() }