Storage: adds write batch

Atomic insertions and huge performance improvements for RocksDB
pull/171/head
Tpt 3 years ago
parent 986d3e60bb
commit 569000b5ea
  1. 9
      lib/src/sparql/dataset.rs
  2. 36
      lib/src/sparql/update.rs
  3. 294
      lib/src/storage/backend/fallback.rs
  4. 16
      lib/src/storage/backend/mod.rs
  5. 228
      lib/src/storage/backend/rocksdb.rs
  6. 7
      lib/src/storage/binary_encoder.rs
  7. 64
      lib/src/storage/io.rs
  8. 605
      lib/src/storage/mod.rs
  9. 33
      lib/src/storage/numeric_encoder.rs
  10. 73
      lib/src/store.rs

@ -6,7 +6,6 @@ use crate::storage::Storage;
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::convert::Infallible;
use std::iter::empty;
pub struct DatasetView {
@ -141,11 +140,9 @@ impl DatasetView {
pub fn encode_term<'a>(&self, term: impl Into<TermRef<'a>>) -> EncodedTerm {
let term = term.into();
let encoded = term.into();
insert_term::<Infallible, _>(term, &encoded, |key, value| {
self.insert_str(key, value);
Ok(())
})
.unwrap(); // Can not fail
insert_term(term, &encoded, &mut |key, value| {
self.insert_str(key, value)
});
encoded
}

@ -12,7 +12,7 @@ use crate::sparql::plan_builder::PlanBuilder;
use crate::sparql::{EvaluationError, UpdateOptions};
use crate::storage::io::load_graph;
use crate::storage::numeric_encoder::{Decoder, EncodedTerm};
use crate::storage::Storage;
use crate::storage::{Storage, StorageWriter};
use oxiri::Iri;
use spargebra::algebra::{GraphPattern, GraphTarget};
use spargebra::term::{
@ -29,6 +29,7 @@ use std::rc::Rc;
pub struct SimpleUpdateEvaluator<'a> {
storage: &'a Storage,
writer: StorageWriter,
base_iri: Option<Rc<Iri<String>>>,
options: UpdateOptions,
client: Client,
@ -41,8 +42,10 @@ impl<'a> SimpleUpdateEvaluator<'a> {
options: UpdateOptions,
) -> Self {
let client = Client::new(options.query_options.http_timeout);
let writer = storage.atomic_writer();
Self {
storage,
writer,
base_iri,
options,
client,
@ -56,6 +59,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
) -> Result<(), EvaluationError> {
for (update, using_dataset) in updates.iter().zip(using_datasets) {
self.eval(update, using_dataset)?;
self.writer.commit()?;
}
Ok(())
}
@ -99,7 +103,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
let mut bnodes = HashMap::new();
for quad in data {
let quad = Self::convert_quad(quad, &mut bnodes);
self.storage.insert(quad.as_ref())?;
self.writer.insert(quad.as_ref())?;
}
Ok(())
}
@ -107,7 +111,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
fn eval_delete_data(&mut self, data: &[GroundQuad]) -> Result<(), EvaluationError> {
for quad in data {
let quad = Self::convert_ground_quad(quad);
self.storage.remove(quad.as_ref())?;
self.writer.remove(quad.as_ref())?;
}
Ok(())
}
@ -133,7 +137,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
if let Some(quad) =
Self::convert_ground_quad_pattern(quad, &variables, &tuple, &dataset)?
{
self.storage.remove(quad.as_ref())?;
self.writer.remove(quad.as_ref())?;
if !insert.is_empty() {
// Hack to make sure the triple terms are still available for an insert
dataset.encode_term(quad.subject.as_ref());
@ -146,7 +150,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
if let Some(quad) =
Self::convert_quad_pattern(quad, &variables, &tuple, &dataset, &mut bnodes)?
{
self.storage.insert(quad.as_ref())?;
self.writer.insert(quad.as_ref())?;
}
}
bnodes.clear();
@ -170,7 +174,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
GraphName::DefaultGraph => GraphNameRef::DefaultGraph,
};
load_graph(
self.storage,
&mut self.writer,
BufReader::new(body),
format,
to_graph_name,
@ -182,7 +186,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
fn eval_create(&mut self, graph_name: &NamedNode, silent: bool) -> Result<(), EvaluationError> {
let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri);
if self.storage.insert_named_graph(graph_name.into())? || silent {
if self.writer.insert_named_graph(graph_name.into())? || silent {
Ok(())
} else {
Err(EvaluationError::msg(format!(
@ -197,7 +201,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
GraphTarget::NamedNode(graph_name) => {
let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri);
if self.storage.contains_named_graph(&graph_name.into())? {
Ok(self.storage.clear_graph(graph_name.into())?)
Ok(self.writer.clear_graph(graph_name.into())?)
} else if silent {
Ok(())
} else {
@ -208,11 +212,11 @@ impl<'a> SimpleUpdateEvaluator<'a> {
}
}
GraphTarget::DefaultGraph => {
self.storage.clear_graph(GraphNameRef::DefaultGraph)?;
self.writer.clear_graph(GraphNameRef::DefaultGraph)?;
Ok(())
}
GraphTarget::NamedGraphs => Ok(self.storage.clear_all_named_graphs()?),
GraphTarget::AllGraphs => Ok(self.storage.clear_all_graphs()?),
GraphTarget::NamedGraphs => Ok(self.writer.clear_all_named_graphs()?),
GraphTarget::AllGraphs => Ok(self.writer.clear_all_graphs()?),
}
}
@ -220,7 +224,7 @@ impl<'a> SimpleUpdateEvaluator<'a> {
match graph {
GraphTarget::NamedNode(graph_name) => {
let graph_name = NamedNodeRef::new_unchecked(&graph_name.iri);
if self.storage.remove_named_graph(graph_name.into())? || silent {
if self.writer.remove_named_graph(graph_name.into())? || silent {
Ok(())
} else {
Err(EvaluationError::msg(format!(
@ -229,11 +233,9 @@ impl<'a> SimpleUpdateEvaluator<'a> {
)))
}
}
GraphTarget::DefaultGraph => {
Ok(self.storage.clear_graph(GraphNameRef::DefaultGraph)?)
}
GraphTarget::NamedGraphs => Ok(self.storage.remove_all_named_graphs()?),
GraphTarget::AllGraphs => Ok(self.storage.clear()?),
GraphTarget::DefaultGraph => Ok(self.writer.clear_graph(GraphNameRef::DefaultGraph)?),
GraphTarget::NamedGraphs => Ok(self.writer.remove_all_named_graphs()?),
GraphTarget::AllGraphs => Ok(self.writer.clear()?),
}
}

@ -1,43 +1,56 @@
//! TODO: This storage is dramatically naive.
use crate::storage::backend::{ColumnFamilyDefinition, CompactionAction, CompactionFilter};
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use crate::error::invalid_input_error;
use crate::storage::backend::{CompactionAction, CompactionFilter};
use std::collections::{hash_map, BTreeMap, HashMap, HashSet};
use std::ffi::CString;
use std::io::Result;
use std::iter::{once, Once};
use std::io::{Error, Result};
use std::sync::{Arc, RwLock};
pub struct ColumnFamilyDefinition {
pub name: &'static str,
pub merge_operator: Option<MergeOperator>,
pub compaction_filter: Option<CompactionFilter>,
pub use_iter: bool,
pub min_prefix_size: usize,
}
#[derive(Clone)]
pub struct Db(Arc<RwLock<BTreeMap<ColumnFamily, Tree>>>);
pub struct Db(Arc<DbInternals>);
#[derive(Default)]
struct Tree {
tree: BTreeMap<Vec<u8>, Vec<u8>>,
merge_operator: Option<MergeOperator>,
compaction_filter: Option<CompactionFilter>,
struct DbInternals {
trees: RwLock<HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>,
merge_operators: HashMap<ColumnFamily, MergeOperator>,
compaction_filters: HashMap<ColumnFamily, CompactionFilter>,
}
impl Db {
pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> {
let mut trees = BTreeMap::new();
let mut trees = HashMap::new();
let mut merge_operators = HashMap::new();
let mut compaction_filters = HashMap::new();
for cf in column_families {
trees.insert(
ColumnFamily(cf.name),
Tree {
tree: BTreeMap::default(),
merge_operator: cf.merge_operator,
compaction_filter: cf.compaction_filter,
},
);
let name = ColumnFamily(cf.name);
trees.insert(name.clone(), BTreeMap::default());
if let Some(me) = cf.merge_operator {
merge_operators.insert(name.clone(), me);
}
if let Some(cf) = cf.compaction_filter {
compaction_filters.insert(name.clone(), cf);
}
}
trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists.
Ok(Self(Arc::new(RwLock::new(trees))))
Ok(Self(Arc::new(DbInternals {
trees: RwLock::new(trees),
merge_operators,
compaction_filters,
})))
}
pub fn column_family(&self, name: &'static str) -> Option<ColumnFamily> {
let name = ColumnFamily(name);
if self.0.read().unwrap().contains_key(&name) {
if self.0.trees.read().unwrap().contains_key(&name) {
Some(name)
} else {
None
@ -51,27 +64,55 @@ impl Db {
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self
.0
.trees
.read()
.unwrap()
.get(column_family)
.unwrap()
.tree
.get(key)
.map(|v| v.to_vec()))
.and_then(|cf| cf.get(key).cloned()))
}
pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> {
Ok(self
.0
.trees
.read()
.unwrap()
.get(column_family)
.unwrap()
.tree
.contains_key(key.as_ref()))
.map_or(false, |cf| cf.contains_key(key)))
}
pub fn new_batch(&self) -> WriteBatchWithIndex {
WriteBatchWithIndex {
by_cf: HashMap::new(),
db: self.clone(),
error: None,
}
}
pub fn write(&self, batch: &mut WriteBatchWithIndex) -> Result<()> {
if let Some(error) = batch.error.take() {
return Err(error);
}
let mut trees = self.0.trees.write().unwrap();
for (cf, ops) in batch.by_cf.drain() {
let tree = trees.get_mut(&cf).ok_or_else(|| {
invalid_input_error(format!("Unsupported column family {}", cf.0))
})?;
for k in ops.to_remove {
tree.remove(&k);
}
for (k, v) in ops.to_insert {
tree.insert(k, v);
}
for (k, v) in ops.to_merge {
let v = self.exec_merge(&cf, &k, tree.get(&k).map(|v| v.as_slice()), &v)?;
tree.insert(k, v);
}
}
Ok(())
}
pub fn insert(
/*pub fn insert(
&self,
column_family: &ColumnFamily,
key: &[u8],
@ -80,14 +121,10 @@ impl Db {
) -> Result<()> {
let mut db = self.0.write().unwrap();
let tree = db.get_mut(column_family).unwrap();
let action = if let Some(filter) = &tree.compaction_filter {
(filter.filter)(key, value)
if let Some(value) = Self::exec_filter(tree, key, value.into()) {
tree.tree.insert(key.into(), value.into())
} else {
CompactionAction::Keep
};
match action {
CompactionAction::Keep => tree.tree.insert(key.into(), value.into()),
CompactionAction::Remove => tree.tree.remove(key),
tree.tree.remove(key)
};
Ok(())
}
@ -129,50 +166,77 @@ impl Db {
let tree = db.get_mut(column_family).unwrap();
match tree.tree.entry(key.into()) {
Entry::Vacant(e) => {
let value = if let Some(merge) = &tree.merge_operator {
(merge.full)(key, None, once(value))
} else {
value.into()
};
let action = if let Some(filter) = &tree.compaction_filter {
(filter.filter)(key, &value)
} else {
CompactionAction::Keep
};
match action {
CompactionAction::Keep => {
if let Some(value) =
Self::exec_filter(tree, key, Self::exec_merge(tree, key, None, value))
{
e.insert(value);
}
CompactionAction::Remove => (),
}
}
Entry::Occupied(mut e) => {
let value = if let Some(merge) = &tree.merge_operator {
(merge.full)(key, Some(&e.get()), once(value))
if let Some(value) =
Self::exec_filter(tree, key, Self::exec_merge(tree, key, None, value))
{
e.insert(value);
} else {
value.into()
};
let action = if let Some(filter) = &tree.compaction_filter {
e.remove();
}
}
}
Ok(())
}*/
fn exec_merge(
&self,
cf: &ColumnFamily,
key: &[u8],
base: Option<&[u8]>,
value: &[u8],
) -> Result<Vec<u8>> {
let merge = self.0.merge_operators.get(cf).ok_or_else(|| {
invalid_input_error(format!("The column family {} has no merge operator", cf.0))
})?;
Ok((merge.full)(key, base, vec![value].into_iter()))
}
fn exec_partial_merge(
&self,
cf: &ColumnFamily,
key: &[u8],
a: &[u8],
b: &[u8],
) -> Result<Vec<u8>> {
let merge = self.0.merge_operators.get(cf).ok_or_else(|| {
invalid_input_error(format!("The column family {} has no merge operator", cf.0))
})?;
Ok((merge.partial)(key, vec![a, b].into_iter()))
}
fn exec_filter(&self, cf: &ColumnFamily, key: &[u8], value: Vec<u8>) -> Option<Vec<u8>> {
let action = if let Some(filter) = self.0.compaction_filters.get(cf) {
(filter.filter)(key, &value)
} else {
CompactionAction::Keep
};
match action {
CompactionAction::Keep => e.insert(value),
CompactionAction::Remove => e.remove(),
};
CompactionAction::Keep => Some(value),
CompactionAction::Remove => None,
}
}
Ok(())
}
pub fn iter(&self, column_family: &ColumnFamily) -> Iter {
self.scan_prefix(column_family, &[])
}
pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Iter {
let trees = self.0.read().unwrap();
let tree = &trees.get(column_family).unwrap().tree;
let trees = self.0.trees.read().unwrap();
let tree = if let Some(tree) = trees.get(column_family) {
tree
} else {
return Iter {
iter: Vec::new().into_iter(),
current: None,
};
};
let data: Vec<_> = if prefix.is_empty() {
tree.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} else {
@ -189,29 +253,119 @@ impl Db {
pub fn len(&self, column_family: &ColumnFamily) -> Result<usize> {
Ok(self
.0
.trees
.read()
.unwrap()
.get(column_family)
.unwrap()
.tree
.len())
.map_or(0, |tree| tree.len()))
}
pub fn is_empty(&self, column_family: &ColumnFamily) -> Result<bool> {
Ok(self
.0
.trees
.read()
.unwrap()
.get(column_family)
.unwrap()
.tree
.is_empty())
.map_or(true, |tree| tree.is_empty()))
}
}
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq)]
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct ColumnFamily(&'static str);
pub struct WriteBatchWithIndex {
by_cf: HashMap<ColumnFamily, WriteBatchWithIndexCF>,
db: Db,
error: Option<Error>,
}
#[derive(Default)]
struct WriteBatchWithIndexCF {
// Evaluation order insert/remove then merge
to_insert: HashMap<Vec<u8>, Vec<u8>>,
to_merge: HashMap<Vec<u8>, Vec<u8>>,
to_remove: HashSet<Vec<u8>>,
}
impl WriteBatchWithIndex {
pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) {
let cf_state = self.by_cf.entry(column_family.clone()).or_default();
cf_state.to_insert.insert(key.into(), value.into());
cf_state.to_merge.remove(key);
cf_state.to_remove.remove(key);
}
pub fn insert_empty(&mut self, column_family: &ColumnFamily, key: &[u8]) {
self.insert(column_family, key, &[])
}
pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) {
let cf_state = self.by_cf.entry(column_family.clone()).or_default();
cf_state.to_insert.remove(key);
cf_state.to_merge.remove(key);
cf_state.to_remove.insert(key.into());
}
pub fn merge(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) {
let cf_state = self.by_cf.entry(column_family.clone()).or_default();
match cf_state.to_merge.entry(key.into()) {
hash_map::Entry::Vacant(e) => {
e.insert(value.into());
}
hash_map::Entry::Occupied(mut e) => {
match self
.db
.exec_partial_merge(column_family, key, e.get(), value)
{
Ok(value) => {
e.insert(value);
}
Err(e) => self.error = Some(e),
}
}
}
}
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<Vec<u8>>> {
if let Some(cf_state) = self.by_cf.get(column_family) {
let value = if cf_state.to_remove.contains(key) {
None
} else if let Some(value) = cf_state.to_insert.get(key) {
Some(value.clone())
} else {
self.db.get(column_family, key)?
};
Ok(if let Some(merge) = cf_state.to_merge.get(key) {
Some(
self.db
.exec_merge(column_family, key, value.as_deref(), merge)?,
)
} else {
value
}
.and_then(|value| self.db.exec_filter(column_family, key, value)))
} else {
self.db.get(column_family, key)
}
}
pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> {
Ok(self.get(column_family, key)?.is_some()) //TODO: optimize
}
pub fn clear(&mut self) {
self.by_cf.clear();
}
pub fn len(&self) -> usize {
self.by_cf
.values()
.map(|v| v.to_insert.len() + v.to_remove.len() + v.to_merge.len())
.sum()
}
}
pub struct Iter {
iter: std::vec::IntoIter<(Vec<u8>, Vec<u8>)>,
current: Option<(Vec<u8>, Vec<u8>)>,
@ -241,4 +395,4 @@ pub struct MergeOperator {
pub name: CString,
}
pub type SlicesIterator<'a> = Once<&'a [u8]>;
pub type SlicesIterator<'a> = std::vec::IntoIter<&'a [u8]>;

@ -2,9 +2,13 @@
//! RocksDB is available, if not in memory
#[cfg(target_arch = "wasm32")]
pub use fallback::{ColumnFamily, Db, Iter, MergeOperator};
pub use fallback::{
ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, WriteBatchWithIndex,
};
#[cfg(not(target_arch = "wasm32"))]
pub use rocksdb::{ColumnFamily, Db, Iter, MergeOperator};
pub use rocksdb::{
ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, WriteBatchWithIndex,
};
use std::ffi::CString;
#[cfg(target_arch = "wasm32")]
@ -12,14 +16,6 @@ mod fallback;
#[cfg(not(target_arch = "wasm32"))]
mod rocksdb;
pub struct ColumnFamilyDefinition {
pub name: &'static str,
pub merge_operator: Option<MergeOperator>,
pub compaction_filter: Option<CompactionFilter>,
pub use_iter: bool,
pub min_prefix_size: usize,
}
pub struct CompactionFilter {
pub filter: fn(&[u8], &[u8]) -> CompactionAction,
pub name: CString,

@ -5,8 +5,8 @@
#![allow(unsafe_code)]
use crate::error::invalid_input_error;
use crate::storage::backend::{ColumnFamilyDefinition, CompactionAction, CompactionFilter};
use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
use crate::storage::backend::{CompactionAction, CompactionFilter};
use libc::{self, c_char, c_int, c_uchar, c_void, free, size_t};
use oxrocksdb_sys::*;
use std::borrow::Borrow;
use std::env::temp_dir;
@ -40,6 +40,14 @@ macro_rules! ffi_result_impl {
}}
}
pub struct ColumnFamilyDefinition {
pub name: &'static str,
pub merge_operator: Option<MergeOperator>,
pub compaction_filter: Option<CompactionFilter>,
pub use_iter: bool,
pub min_prefix_size: usize,
}
#[derive(Clone)]
pub struct Db(Arc<DbHandler>);
@ -51,7 +59,6 @@ struct DbHandler {
options: *mut rocksdb_options_t,
read_options: *mut rocksdb_readoptions_t,
write_options: *mut rocksdb_writeoptions_t,
low_priority_write_options: *mut rocksdb_writeoptions_t,
flush_options: *mut rocksdb_flushoptions_t,
compaction_options: *mut rocksdb_compactoptions_t,
env: Option<*mut rocksdb_env_t>,
@ -73,7 +80,6 @@ impl Drop for DbHandler {
}
rocksdb_readoptions_destroy(self.read_options);
rocksdb_writeoptions_destroy(self.write_options);
rocksdb_writeoptions_destroy(self.low_priority_write_options);
rocksdb_flushoptions_destroy(self.flush_options);
rocksdb_compactoptions_destroy(self.compaction_options);
rocksdb_options_destroy(self.options);
@ -267,13 +273,6 @@ impl Db {
rocksdb_writeoptions_disable_WAL(write_options, 1); // No need for WAL
}
let low_priority_write_options = rocksdb_writeoptions_create_copy(write_options);
assert!(
!low_priority_write_options.is_null(),
"rocksdb_writeoptions_create_copy returned null"
);
rocksdb_writeoptions_set_low_pri(low_priority_write_options, 1);
let flush_options = rocksdb_flushoptions_create();
assert!(
!flush_options.is_null(),
@ -291,7 +290,6 @@ impl Db {
options,
read_options,
write_options,
low_priority_write_options,
flush_options,
compaction_options,
env,
@ -359,82 +357,27 @@ impl Db {
Ok(self.get(column_family, key)?.is_some()) //TODO: optimize
}
pub fn insert(
&self,
column_family: &ColumnFamily,
key: &[u8],
value: &[u8],
low_priority: bool,
) -> Result<()> {
unsafe {
ffi_result!(rocksdb_put_cf(
self.0.db,
if low_priority {
self.0.low_priority_write_options
} else {
self.0.write_options
},
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
))
}
}
pub fn merge(
&self,
column_family: &ColumnFamily,
key: &[u8],
value: &[u8],
low_priority: bool,
) -> Result<()> {
pub fn new_batch(&self) -> WriteBatchWithIndex {
unsafe {
ffi_result!(rocksdb_merge_cf(
self.0.db,
if low_priority {
self.0.low_priority_write_options
} else {
self.0.write_options
},
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
))
let batch = rocksdb_writebatch_wi_create(0, 0);
assert!(!batch.is_null(), "rocksdb_writebatch_create returned null");
WriteBatchWithIndex {
batch,
db: self.clone(),
}
}
pub fn insert_empty(
&self,
column_family: &ColumnFamily,
key: &[u8],
low_priority: bool,
) -> Result<()> {
self.insert(column_family, key, &[], low_priority)
}
pub fn remove(
&self,
column_family: &ColumnFamily,
key: &[u8],
low_priority: bool,
) -> Result<()> {
pub fn write(&self, batch: &mut WriteBatchWithIndex) -> Result<()> {
unsafe {
ffi_result!(rocksdb_delete_cf(
ffi_result!(rocksdb_write_writebatch_wi(
self.0.db,
if low_priority {
self.0.low_priority_write_options
} else {
self.0.write_options
},
column_family.0,
key.as_ptr() as *const c_char,
key.len()
self.0.write_options,
batch.batch
))
}
}?;
batch.clear();
Ok(())
}
pub fn iter(&self, column_family: &ColumnFamily) -> Iter {
@ -517,6 +460,96 @@ pub struct ColumnFamily(*mut rocksdb_column_family_handle_t);
unsafe impl Send for ColumnFamily {}
unsafe impl Sync for ColumnFamily {}
pub struct WriteBatchWithIndex {
batch: *mut rocksdb_writebatch_wi_t,
db: Db,
}
impl Drop for WriteBatchWithIndex {
fn drop(&mut self) {
unsafe { rocksdb_writebatch_wi_destroy(self.batch) }
}
}
impl WriteBatchWithIndex {
pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) {
unsafe {
rocksdb_writebatch_wi_put_cf(
self.batch,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
)
}
}
pub fn insert_empty(&mut self, column_family: &ColumnFamily, key: &[u8]) {
self.insert(column_family, key, &[])
}
pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) {
unsafe {
rocksdb_writebatch_wi_delete_cf(
self.batch,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
)
}
}
pub fn merge(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) {
unsafe {
rocksdb_writebatch_wi_merge_cf(
self.batch,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
)
}
}
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<Buffer>> {
unsafe {
let mut len = 0;
let base = ffi_result!(rocksdb_writebatch_wi_get_from_batch_and_db_cf(
self.batch,
self.db.0.db,
self.db.0.read_options,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
&mut len
))? as *mut u8;
Ok(if base.is_null() {
None
} else {
Some(Buffer { base, len })
})
}
}
pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> {
Ok(self.get(column_family, key)?.is_some()) //TODO: optimize
}
pub fn clear(&mut self) {
unsafe {
rocksdb_writebatch_wi_clear(self.batch);
}
}
pub fn len(&self) -> usize {
unsafe { rocksdb_writebatch_wi_count(self.batch) }
.try_into()
.unwrap()
}
}
pub struct PinnableSlice(*mut rocksdb_pinnableslice_t);
impl Drop for PinnableSlice {
@ -551,6 +584,39 @@ impl Borrow<[u8]> for PinnableSlice {
}
}
pub struct Buffer {
base: *mut u8,
len: usize,
}
impl Drop for Buffer {
fn drop(&mut self) {
unsafe {
free(self.base as *mut c_void);
}
}
}
impl Deref for Buffer {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.base, self.len) }
}
}
impl AsRef<[u8]> for Buffer {
fn as_ref(&self) -> &[u8] {
&*self
}
}
impl Borrow<[u8]> for Buffer {
fn borrow(&self) -> &[u8] {
&*self
}
}
pub struct Iter {
iter: *mut rocksdb_iterator_t,
is_currently_valid: bool,
@ -615,7 +681,7 @@ impl Iter {
fn convert_error(ptr: *const c_char) -> Error {
let message = unsafe {
let s = CStr::from_ptr(ptr).to_string_lossy().into_owned();
libc::free(ptr as *mut c_void);
free(ptr as *mut c_void);
s
};
other_error(message)

@ -666,12 +666,7 @@ mod tests {
impl MemoryStrStore {
fn insert_term(&self, term: TermRef<'_>, encoded: &EncodedTerm) {
insert_term(term, encoded, |h, v| {
self.insert_str(h, v);
let r: Result<(), Infallible> = Ok(());
r
})
.unwrap();
insert_term(term, encoded, &mut |h, v| self.insert_str(h, v))
}
fn insert_str(&self, key: &StrHash, value: &str) {

@ -5,43 +5,33 @@ use crate::io::{
DatasetFormat, DatasetParser, DatasetSerializer, GraphFormat, GraphParser, GraphSerializer,
};
use crate::model::{GraphNameRef, Quad, Triple};
use crate::storage::StorageLike;
use std::io;
use std::io::{BufRead, Write};
use crate::storage::StorageWriter;
use std::io::{BufRead, Result, Write};
pub fn load_graph<S: StorageLike>(
store: &S,
pub fn load_graph(
writer: &mut StorageWriter,
reader: impl BufRead,
format: GraphFormat,
to_graph_name: GraphNameRef<'_>,
base_iri: Option<&str>,
) -> Result<(), StoreOrParseError<S::Error>> {
) -> Result<()> {
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| StoreOrParseError::Parse(invalid_input_error(e)))?;
.map_err(invalid_input_error)?;
}
for t in parser
.read_triples(reader)
.map_err(StoreOrParseError::Parse)?
{
store
.insert(
t.map_err(StoreOrParseError::Parse)?
.as_ref()
.in_graph(to_graph_name),
)
.map_err(StoreOrParseError::Store)?;
for t in parser.read_triples(reader)? {
writer.insert(t?.as_ref().in_graph(to_graph_name))?;
}
Ok(())
}
pub fn dump_graph(
triples: impl Iterator<Item = io::Result<Triple>>,
triples: impl Iterator<Item = Result<Triple>>,
writer: impl Write,
format: GraphFormat,
) -> io::Result<()> {
) -> Result<()> {
let mut writer = GraphSerializer::from_format(format).triple_writer(writer)?;
for triple in triples {
writer.write(&triple?)?;
@ -49,50 +39,32 @@ pub fn dump_graph(
writer.finish()
}
pub fn load_dataset<S: StorageLike>(
store: &S,
pub fn load_dataset(
writer: &mut StorageWriter,
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> Result<(), StoreOrParseError<S::Error>> {
) -> Result<()> {
let mut parser = DatasetParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(|e| StoreOrParseError::Parse(invalid_input_error(e)))?;
.map_err(invalid_input_error)?;
}
for t in parser
.read_quads(reader)
.map_err(StoreOrParseError::Parse)?
{
store
.insert(t.map_err(StoreOrParseError::Parse)?.as_ref())
.map_err(StoreOrParseError::Store)?;
for t in parser.read_quads(reader)? {
writer.insert(t?.as_ref())?;
}
Ok(())
}
pub fn dump_dataset(
quads: impl Iterator<Item = io::Result<Quad>>,
quads: impl Iterator<Item = Result<Quad>>,
writer: impl Write,
format: DatasetFormat,
) -> io::Result<()> {
) -> Result<()> {
let mut writer = DatasetSerializer::from_format(format).quad_writer(writer)?;
for quad in quads {
writer.write(&quad?)?;
}
writer.finish()
}
pub enum StoreOrParseError<S> {
Store(S),
Parse(io::Error),
}
impl From<StoreOrParseError<Self>> for io::Error {
fn from(error: StoreOrParseError<Self>) -> Self {
match error {
StoreOrParseError::Store(error) | StoreOrParseError::Parse(error) => error,
}
}
}

@ -11,7 +11,7 @@ use crate::storage::numeric_encoder::{
};
use backend::{
ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter,
MergeOperator,
MergeOperator, WriteBatchWithIndex,
};
use std::ffi::CString;
use std::io::Result;
@ -36,6 +36,7 @@ const DPOS_CF: &str = "dpos";
const DOSP_CF: &str = "dosp";
const GRAPHS_CF: &str = "graphs";
const DEFAULT_CF: &str = "default";
const AUTO_WRITE_BATCH_THRESHOLD: usize = 1024;
/// Low level storage primitives
#[derive(Clone)]
@ -211,34 +212,42 @@ impl Storage {
let mut version = this.ensure_version()?;
if version == 0 {
let mut batch = this.db.new_batch();
// We migrate to v1
for quad in this.quads() {
let quad = quad?;
if !quad.graph_name.is_default_graph() {
this.db
.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name), false)?;
batch.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name));
if batch.len() >= AUTO_WRITE_BATCH_THRESHOLD {
this.db.write(&mut batch)?;
}
}
}
this.db.write(&mut batch)?;
this.db.flush(&this.graphs_cf)?;
version = 1;
this.set_version(version)?;
this.db.flush(&this.default_cf)?;
this.update_version(version)?;
}
if version == 1 {
// We migrate to v2
let mut batch = this.db.new_batch();
let mut iter = this.db.iter(&this.id2str_cf);
while let (Some(key), Some(value)) = (iter.key(), iter.value()) {
let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&i32::MAX.to_be_bytes());
new_value.extend_from_slice(value);
this.db.insert(&this.id2str_cf, key, &new_value, false)?;
batch.insert(&this.id2str_cf, key, &new_value);
iter.next();
if batch.len() >= AUTO_WRITE_BATCH_THRESHOLD {
this.db.write(&mut batch)?;
batch.clear();
}
}
this.db.write(&mut batch)?;
iter.status()?;
this.db.flush(&this.id2str_cf)?;
version = 2;
this.set_version(version)?;
this.db.flush(&this.default_cf)?;
this.update_version(version)?;
}
match version {
@ -261,20 +270,17 @@ impl Storage {
buffer.copy_from_slice(&version);
u64::from_be_bytes(buffer)
} else {
self.set_version(LATEST_STORAGE_VERSION)?;
self.update_version(LATEST_STORAGE_VERSION)?;
LATEST_STORAGE_VERSION
},
)
}
fn set_version(&self, version: u64) -> Result<()> {
self.db.insert(
&self.default_cf,
b"oxversion",
&version.to_be_bytes(),
false,
)?;
Ok(())
fn update_version(&self, version: u64) -> Result<()> {
let mut batch = self.db.new_batch();
batch.insert(&self.default_cf, b"oxversion", &version.to_be_bytes());
self.db.write(&mut batch)?;
self.db.flush(&self.default_cf)
}
pub fn len(&self) -> Result<usize> {
@ -589,241 +595,22 @@ impl Storage {
}
}
pub fn insert(&self, quad: QuadRef<'_>) -> Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
let encoded = quad.into();
Ok(if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, &encoded);
if self.db.contains_key(&self.dspo_cf, buffer.as_slice())? {
false
} else {
self.insert_quad_triple(quad, &encoded)?;
self.db
.insert_empty(&self.dspo_cf, buffer.as_slice(), false)?;
buffer.clear();
write_pos_quad(&mut buffer, &encoded);
self.db
.insert_empty(&self.dpos_cf, buffer.as_slice(), false)?;
buffer.clear();
write_osp_quad(&mut buffer, &encoded);
self.db
.insert_empty(&self.dosp_cf, buffer.as_slice(), false)?;
buffer.clear();
true
}
} else {
write_spog_quad(&mut buffer, &encoded);
if self.db.contains_key(&self.spog_cf, buffer.as_slice())? {
false
} else {
self.insert_quad_triple(quad, &encoded)?;
self.db
.insert_empty(&self.spog_cf, buffer.as_slice(), false)?;
buffer.clear();
write_posg_quad(&mut buffer, &encoded);
self.db
.insert_empty(&self.posg_cf, buffer.as_slice(), false)?;
buffer.clear();
write_ospg_quad(&mut buffer, &encoded);
self.db
.insert_empty(&self.ospg_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gspo_quad(&mut buffer, &encoded);
self.db
.insert_empty(&self.gspo_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gpos_quad(&mut buffer, &encoded);
self.db
.insert_empty(&self.gpos_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gosp_quad(&mut buffer, &encoded);
self.db
.insert_empty(&self.gosp_cf, buffer.as_slice(), false)?;
buffer.clear();
write_term(&mut buffer, &encoded.graph_name);
if !self.db.contains_key(&self.graphs_cf, &buffer)? {
self.db.insert_empty(&self.graphs_cf, &buffer, false)?;
self.insert_graph_name(quad.graph_name, &encoded.graph_name)?;
}
buffer.clear();
true
}
})
}
pub fn remove(&self, quad: QuadRef<'_>) -> Result<bool> {
self.remove_encoded(&quad.into())
}
fn remove_encoded(&self, quad: &EncodedQuad) -> Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
Ok(if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad);
if self.db.contains_key(&self.dspo_cf, buffer.as_slice())? {
self.db.remove(&self.dspo_cf, buffer.as_slice(), false)?;
buffer.clear();
write_pos_quad(&mut buffer, quad);
self.db.remove(&self.dpos_cf, buffer.as_slice(), false)?;
buffer.clear();
write_osp_quad(&mut buffer, quad);
self.db.remove(&self.dosp_cf, buffer.as_slice(), false)?;
buffer.clear();
self.remove_quad_triple(quad)?;
true
} else {
false
}
} else {
write_spog_quad(&mut buffer, quad);
if self.db.contains_key(&self.spog_cf, buffer.as_slice())? {
self.db.remove(&self.spog_cf, buffer.as_slice(), false)?;
buffer.clear();
write_posg_quad(&mut buffer, quad);
self.db.remove(&self.posg_cf, buffer.as_slice(), false)?;
buffer.clear();
write_ospg_quad(&mut buffer, quad);
self.db.remove(&self.ospg_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gspo_quad(&mut buffer, quad);
self.db.remove(&self.gspo_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gpos_quad(&mut buffer, quad);
self.db.remove(&self.gpos_cf, buffer.as_slice(), false)?;
buffer.clear();
write_gosp_quad(&mut buffer, quad);
self.db.remove(&self.gosp_cf, buffer.as_slice(), false)?;
buffer.clear();
self.remove_quad_triple(quad)?;
true
} else {
false
}
})
}
pub fn insert_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> Result<bool> {
let encoded_graph_name = graph_name.into();
let encoded = encode_term(&encoded_graph_name);
Ok(if self.db.contains_key(&self.graphs_cf, &encoded)? {
false
} else {
self.db.insert_empty(&self.graphs_cf, &encoded, false)?;
self.insert_term(graph_name.into(), &encoded_graph_name)?;
true
})
}
pub fn clear_graph(&self, graph_name: GraphNameRef<'_>) -> Result<()> {
for quad in self.quads_for_graph(&graph_name.into()) {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn clear_all_named_graphs(&self) -> Result<()> {
for quad in self.quads_in_named_graph() {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn clear_all_graphs(&self) -> Result<()> {
for quad in self.quads() {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn remove_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> Result<bool> {
self.remove_encoded_named_graph(&graph_name.into())
}
fn remove_encoded_named_graph(&self, graph_name: &EncodedTerm) -> Result<bool> {
for quad in self.quads_for_graph(graph_name) {
self.remove_encoded(&quad?)?;
}
let encoded_graph = encode_term(graph_name);
Ok(if self.db.contains_key(&self.graphs_cf, &encoded_graph)? {
self.db.remove(&self.graphs_cf, &encoded_graph, false)?;
self.remove_term(graph_name)?;
true
} else {
false
})
}
pub fn remove_all_named_graphs(&self) -> Result<()> {
for graph_name in self.named_graphs() {
self.remove_encoded_named_graph(&graph_name?)?;
pub fn atomic_writer(&self) -> StorageWriter {
StorageWriter {
buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE),
batch: self.db.new_batch(),
storage: self.clone(),
auto_commit: false,
}
Ok(())
}
pub fn clear(&self) -> Result<()> {
for graph_name in self.named_graphs() {
self.remove_encoded_named_graph(&graph_name?)?;
}
for quad in self.quads() {
self.remove_encoded(&quad?)?;
pub fn simple_writer(&self) -> StorageWriter {
StorageWriter {
buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE),
batch: self.db.new_batch(),
storage: self.clone(),
auto_commit: true,
}
Ok(())
}
fn insert_term(&self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<()> {
insert_term(term, encoded, |key, value| self.insert_str(key, value))
}
fn insert_graph_name(&self, graph_name: GraphNameRef<'_>, encoded: &EncodedTerm) -> Result<()> {
match graph_name {
GraphNameRef::NamedNode(graph_name) => self.insert_term(graph_name.into(), encoded),
GraphNameRef::BlankNode(graph_name) => self.insert_term(graph_name.into(), encoded),
GraphNameRef::DefaultGraph => Ok(()),
}
}
fn insert_quad_triple(&self, quad: QuadRef<'_>, encoded: &EncodedQuad) -> Result<()> {
self.insert_term(quad.subject.into(), &encoded.subject)?;
self.insert_term(quad.predicate.into(), &encoded.predicate)?;
self.insert_term(quad.object, &encoded.object)?;
Ok(())
}
fn remove_term(&self, encoded: &EncodedTerm) -> Result<()> {
remove_term(encoded, |key| self.remove_str(key))
}
fn remove_quad_triple(&self, encoded: &EncodedQuad) -> Result<()> {
self.remove_term(&encoded.subject)?;
self.remove_term(&encoded.predicate)?;
self.remove_term(&encoded.object)?;
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
@ -879,23 +666,6 @@ impl Storage {
i32::from_be_bytes(v[..4].try_into().unwrap()) > 0
}))
}
fn insert_str(&self, key: &StrHash, value: &str) -> Result<()> {
let mut buffer = Vec::with_capacity(value.len() + 4);
buffer.extend_from_slice(&1_i32.to_be_bytes());
buffer.extend_from_slice(value.as_bytes());
self.db
.merge(&self.id2str_cf, &key.to_be_bytes(), &buffer, false)
}
fn remove_str(&self, key: &StrHash) -> Result<()> {
self.db.merge(
&self.id2str_cf,
&key.to_be_bytes(),
&(-1_i32).to_be_bytes(),
true,
)
}
}
pub struct ChainedDecodingQuadIterator {
@ -968,12 +738,6 @@ impl Iterator for DecodingGraphIterator {
}
}
pub trait StorageLike: StrLookup<Error = std::io::Error> {
fn insert(&self, quad: QuadRef<'_>) -> Result<bool>;
fn remove(&self, quad: QuadRef<'_>) -> Result<bool>;
}
impl StrLookup for Storage {
type Error = std::io::Error;
@ -986,13 +750,286 @@ impl StrLookup for Storage {
}
}
impl StorageLike for Storage {
fn insert(&self, quad: QuadRef<'_>) -> Result<bool> {
self.insert(quad)
pub struct StorageWriter {
buffer: Vec<u8>,
batch: WriteBatchWithIndex,
storage: Storage,
auto_commit: bool,
}
impl StorageWriter {
pub fn insert(&mut self, quad: QuadRef<'_>) -> Result<bool> {
let encoded = quad.into();
self.buffer.clear();
let result = if quad.graph_name.is_default_graph() {
write_spo_quad(&mut self.buffer, &encoded);
if self
.batch
.contains_key(&self.storage.dspo_cf, &self.buffer)?
{
false
} else {
self.batch.insert_empty(&self.storage.dspo_cf, &self.buffer);
self.buffer.clear();
write_pos_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.dpos_cf, &self.buffer);
self.buffer.clear();
write_osp_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.dosp_cf, &self.buffer);
self.insert_term(quad.subject.into(), &encoded.subject);
self.insert_term(quad.predicate.into(), &encoded.predicate);
self.insert_term(quad.object, &encoded.object);
true
}
} else {
write_spog_quad(&mut self.buffer, &encoded);
if self
.batch
.contains_key(&self.storage.spog_cf, &self.buffer)?
{
false
} else {
self.batch.insert_empty(&self.storage.spog_cf, &self.buffer);
self.buffer.clear();
write_posg_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.posg_cf, &self.buffer);
self.buffer.clear();
write_ospg_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.ospg_cf, &self.buffer);
self.buffer.clear();
write_gspo_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.gspo_cf, &self.buffer);
self.buffer.clear();
write_gpos_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.gpos_cf, &self.buffer);
self.buffer.clear();
write_gosp_quad(&mut self.buffer, &encoded);
self.batch.insert_empty(&self.storage.gosp_cf, &self.buffer);
self.insert_term(quad.subject.into(), &encoded.subject);
self.insert_term(quad.predicate.into(), &encoded.predicate);
self.insert_term(quad.object, &encoded.object);
self.buffer.clear();
write_term(&mut self.buffer, &encoded.graph_name);
if !self
.batch
.contains_key(&self.storage.graphs_cf, &self.buffer)?
{
self.batch
.insert_empty(&self.storage.graphs_cf, &self.buffer);
self.insert_graph_name(quad.graph_name, &encoded.graph_name);
}
true
}
};
self.write_if_needed()?;
Ok(result)
}
pub fn insert_named_graph(&mut self, graph_name: NamedOrBlankNodeRef<'_>) -> Result<bool> {
let encoded_graph_name = graph_name.into();
self.buffer.clear();
write_term(&mut self.buffer, &encoded_graph_name);
let result = if self
.batch
.contains_key(&self.storage.graphs_cf, &self.buffer)?
{
false
} else {
self.batch
.insert_empty(&self.storage.graphs_cf, &self.buffer);
self.insert_term(graph_name.into(), &encoded_graph_name);
true
};
self.write_if_needed()?;
Ok(result)
}
fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) {
insert_term(term, encoded, &mut |key, value| self.insert_str(key, value))
}
fn insert_graph_name(&mut self, graph_name: GraphNameRef<'_>, encoded: &EncodedTerm) {
match graph_name {
GraphNameRef::NamedNode(graph_name) => self.insert_term(graph_name.into(), encoded),
GraphNameRef::BlankNode(graph_name) => self.insert_term(graph_name.into(), encoded),
GraphNameRef::DefaultGraph => (),
}
}
fn insert_str(&mut self, key: &StrHash, value: &str) {
self.buffer.clear();
self.buffer.extend_from_slice(&1_i32.to_be_bytes());
self.buffer.extend_from_slice(value.as_bytes());
self.batch
.merge(&self.storage.id2str_cf, &key.to_be_bytes(), &self.buffer);
}
fn remove(&self, quad: QuadRef<'_>) -> Result<bool> {
self.remove(quad)
pub fn remove(&mut self, quad: QuadRef<'_>) -> Result<bool> {
self.remove_encoded(&quad.into())
}
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<bool> {
self.buffer.clear();
let result = if quad.graph_name.is_default_graph() {
write_spo_quad(&mut self.buffer, quad);
if self
.batch
.contains_key(&self.storage.dspo_cf, &self.buffer)?
{
self.batch.remove(&self.storage.dspo_cf, &self.buffer);
self.buffer.clear();
write_pos_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.dpos_cf, &self.buffer);
self.buffer.clear();
write_osp_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.dosp_cf, &self.buffer);
self.remove_term(&quad.subject);
self.remove_term(&quad.predicate);
self.remove_term(&quad.object);
true
} else {
false
}
} else {
write_spog_quad(&mut self.buffer, quad);
if self
.batch
.contains_key(&self.storage.spog_cf, &self.buffer)?
{
self.batch.remove(&self.storage.spog_cf, &self.buffer);
self.buffer.clear();
write_posg_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.posg_cf, &self.buffer);
self.buffer.clear();
write_ospg_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.ospg_cf, &self.buffer);
self.buffer.clear();
write_gspo_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.gspo_cf, &self.buffer);
self.buffer.clear();
write_gpos_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.gpos_cf, &self.buffer);
self.buffer.clear();
write_gosp_quad(&mut self.buffer, quad);
self.batch.remove(&self.storage.gosp_cf, &self.buffer);
self.remove_term(&quad.subject);
self.remove_term(&quad.predicate);
self.remove_term(&quad.object);
true
} else {
false
}
};
self.write_if_needed()?;
Ok(result)
}
pub fn clear_graph(&mut self, graph_name: GraphNameRef<'_>) -> Result<()> {
for quad in self.storage.quads_for_graph(&graph_name.into()) {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn clear_all_named_graphs(&mut self) -> Result<()> {
for quad in self.storage.quads_in_named_graph() {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn clear_all_graphs(&mut self) -> Result<()> {
for quad in self.storage.quads() {
self.remove_encoded(&quad?)?;
}
Ok(())
}
pub fn remove_named_graph(&mut self, graph_name: NamedOrBlankNodeRef<'_>) -> Result<bool> {
self.remove_encoded_named_graph(&graph_name.into())
}
fn remove_encoded_named_graph(&mut self, graph_name: &EncodedTerm) -> Result<bool> {
for quad in self.storage.quads_for_graph(graph_name) {
self.remove_encoded(&quad?)?;
}
self.buffer.clear();
write_term(&mut self.buffer, graph_name);
let result = if self
.batch
.contains_key(&self.storage.graphs_cf, &self.buffer)?
{
self.batch.remove(&self.storage.graphs_cf, &self.buffer);
self.remove_term(graph_name);
true
} else {
false
};
self.write_if_needed()?;
Ok(result)
}
pub fn remove_all_named_graphs(&mut self) -> Result<()> {
for graph_name in self.storage.named_graphs() {
self.remove_encoded_named_graph(&graph_name?)?;
}
Ok(())
}
pub fn clear(&mut self) -> Result<()> {
for graph_name in self.storage.named_graphs() {
self.remove_encoded_named_graph(&graph_name?)?;
}
for quad in self.storage.quads() {
self.remove_encoded(&quad?)?;
}
Ok(())
}
fn remove_term(&mut self, encoded: &EncodedTerm) {
remove_term(encoded, &mut |key| self.remove_str(key));
}
fn remove_str(&mut self, key: &StrHash) {
self.batch.merge(
&self.storage.id2str_cf,
&key.to_be_bytes(),
&(-1_i32).to_be_bytes(),
)
}
fn write_if_needed(&mut self) -> Result<()> {
if self.auto_commit && self.batch.len() > AUTO_WRITE_BATCH_THRESHOLD {
self.commit()?;
}
Ok(())
}
pub fn commit(&mut self) -> Result<()> {
self.storage.db.write(&mut self.batch)?;
Ok(())
}
}
@ -1017,11 +1054,11 @@ mod tests {
);
let storage = Storage::new()?;
storage.insert(quad)?;
storage.insert(quad2)?;
storage.remove(quad2)?;
storage.flush()?;
storage.db.compact(&storage.id2str_cf)?;
let mut writer = storage.atomic_writer();
writer.insert(quad)?;
writer.insert(quad2)?;
writer.remove(quad2)?;
writer.commit()?;
assert!(storage
.get_str(&StrHash::new("http://example.com/s"))?
.is_some());
@ -1031,7 +1068,8 @@ mod tests {
assert!(storage
.get_str(&StrHash::new("http://example.com/o2"))?
.is_none());
storage.clear_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?;
writer.clear_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?;
writer.commit()?;
assert!(storage
.get_str(&StrHash::new("http://example.com/s"))?
.is_none());
@ -1044,7 +1082,8 @@ mod tests {
assert!(storage
.get_str(&StrHash::new("http://example.com/g"))?
.is_some());
storage.remove_named_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?;
writer.remove_named_graph(NamedNodeRef::new_unchecked("http://example.com/g").into())?;
writer.commit()?;
assert!(storage
.get_str(&StrHash::new("http://example.com/g"))?
.is_none());

@ -665,11 +665,11 @@ pub trait StrLookup {
fn contains_str(&self, key: &StrHash) -> Result<bool, Self::Error>;
}
pub fn insert_term<E, F: Fn(&StrHash, &str) -> Result<(), E> + Copy>(
pub fn insert_term<F: FnMut(&StrHash, &str)>(
term: TermRef<'_>,
encoded: &EncodedTerm,
insert_str: F,
) -> Result<(), E> {
insert_str: &mut F,
) {
match term {
TermRef::NamedNode(node) => {
if let EncodedTerm::NamedNode { iri_id } = encoded {
@ -680,7 +680,7 @@ pub fn insert_term<E, F: Fn(&StrHash, &str) -> Result<(), E> + Copy>(
}
TermRef::BlankNode(node) => match encoded {
EncodedTerm::BigBlankNode { id_id } => insert_str(id_id, node.as_str()),
EncodedTerm::SmallBlankNode(..) | EncodedTerm::NumericalBlankNode { .. } => Ok(()),
EncodedTerm::SmallBlankNode(..) | EncodedTerm::NumericalBlankNode { .. } => (),
_ => unreachable!("Invalid term encoding {:?} for {}", encoded, term),
},
TermRef::Literal(literal) => match encoded {
@ -699,7 +699,7 @@ pub fn insert_term<E, F: Fn(&StrHash, &str) -> Result<(), E> + Copy>(
value_id,
language_id,
} => {
insert_str(value_id, literal.value())?;
insert_str(value_id, literal.value());
if let Some(language) = literal.language() {
insert_str(language_id, language)
} else {
@ -713,7 +713,7 @@ pub fn insert_term<E, F: Fn(&StrHash, &str) -> Result<(), E> + Copy>(
value_id,
datatype_id,
} => {
insert_str(value_id, literal.value())?;
insert_str(value_id, literal.value());
insert_str(datatype_id, literal.datatype().as_str())
}
EncodedTerm::SmallStringLiteral(..)
@ -733,17 +733,17 @@ pub fn insert_term<E, F: Fn(&StrHash, &str) -> Result<(), E> + Copy>(
| EncodedTerm::GMonthLiteral(..)
| EncodedTerm::DurationLiteral(..)
| EncodedTerm::YearMonthDurationLiteral(..)
| EncodedTerm::DayTimeDurationLiteral(..) => Ok(()),
| EncodedTerm::DayTimeDurationLiteral(..) => (),
_ => unreachable!("Invalid term encoding {:?} for {}", encoded, term),
},
TermRef::Triple(triple) => {
if let EncodedTerm::Triple(encoded) = encoded {
insert_term(triple.subject.as_ref().into(), &encoded.subject, insert_str)?;
insert_term(triple.subject.as_ref().into(), &encoded.subject, insert_str);
insert_term(
triple.predicate.as_ref().into(),
&encoded.predicate,
insert_str,
)?;
);
insert_term(triple.object.as_ref(), &encoded.object, insert_str)
} else {
unreachable!("Invalid term encoding {:?} for {}", encoded, term)
@ -752,10 +752,7 @@ pub fn insert_term<E, F: Fn(&StrHash, &str) -> Result<(), E> + Copy>(
}
}
pub fn remove_term<E, F: Fn(&StrHash) -> Result<(), E> + Copy>(
encoded: &EncodedTerm,
remove_str: F,
) -> Result<(), E> {
pub fn remove_term<F: FnMut(&StrHash)>(encoded: &EncodedTerm, remove_str: &mut F) {
match encoded {
EncodedTerm::NamedNode { iri_id } => remove_str(iri_id),
EncodedTerm::BigBlankNode { id_id } => remove_str(id_id),
@ -766,7 +763,7 @@ pub fn remove_term<E, F: Fn(&StrHash) -> Result<(), E> + Copy>(
value_id,
language_id,
} => {
remove_str(value_id)?;
remove_str(value_id);
remove_str(language_id)
}
EncodedTerm::SmallTypedLiteral { datatype_id, .. } => remove_str(datatype_id),
@ -774,15 +771,15 @@ pub fn remove_term<E, F: Fn(&StrHash) -> Result<(), E> + Copy>(
value_id,
datatype_id,
} => {
remove_str(value_id)?;
remove_str(value_id);
remove_str(datatype_id)
}
EncodedTerm::Triple(encoded) => {
remove_term(&encoded.subject, remove_str)?;
remove_term(&encoded.predicate, remove_str)?;
remove_term(&encoded.subject, remove_str);
remove_term(&encoded.predicate, remove_str);
remove_term(&encoded.object, remove_str)
}
_ => Ok(()),
_ => (),
}
}

@ -230,10 +230,8 @@ impl Store {
/// Loads a graph file (i.e. triples) into the store
///
/// Warning: This functions saves the triples in a not atomic way.
/// If the parsing fails in the middle of the file only a part of it may be written to the store.
/// It might leave the store in a bad state if a crash happens during a triple insertion.
/// Use a (memory greedy) [transaction](Store::transaction()) if you do not want that.
/// Warning: This function is not atomic. If an error happens during the file parsing, only a
/// part of the file might be written to the store.
///
/// Usage example:
/// ```
@ -263,22 +261,15 @@ impl Store {
to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>,
) -> io::Result<()> {
load_graph(
&self.storage,
reader,
format,
to_graph_name.into(),
base_iri,
)?;
Ok(())
let mut writer = self.storage.simple_writer();
load_graph(&mut writer, reader, format, to_graph_name.into(), base_iri)?;
writer.commit()
}
/// Loads a dataset file (i.e. quads) into the store.
///
/// Warning: This functions saves the triples in a not atomic way.
/// If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// It might leave the store in a bad state if a crash happens during a quad insertion.
/// Use a (memory greedy) [transaction](Store::transaction()) if you do not want that.
/// Warning: This function is not atomic. If an error happens during the file parsing, only a
/// part of the file might be written to the store.
///
/// Usage example:
/// ```
@ -307,18 +298,15 @@ impl Store {
format: DatasetFormat,
base_iri: Option<&str>,
) -> io::Result<()> {
load_dataset(&self.storage, reader, format, base_iri)?;
Ok(())
let mut writer = self.storage.simple_writer();
load_dataset(&mut writer, reader, format, base_iri)?;
writer.commit()
}
/// Adds a quad to this store.
///
/// Returns `true` if the quad was not already in the store.
///
/// This method is optimized for performances and is not atomic.
/// It might leave the store in a bad state if a crash happens during the insertion.
/// Use a (memory greedy) [transaction](Store::transaction()) if you do not want that.
///
/// Usage example:
/// ```
/// use oxigraph::store::Store;
@ -334,17 +322,16 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn insert<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
self.storage.insert(quad.into())
let mut writer = self.storage.atomic_writer();
let result = writer.insert(quad.into())?;
writer.commit()?;
Ok(result)
}
/// Removes a quad from this store.
///
/// Returns `true` if the quad was in the store and has been removed.
///
/// This method is optimized for performances and is not atomic.
/// It might leave the store in a bad state if a crash happens during the removal.
/// Use a (memory greedy) [transaction](Store::transaction()) if you do not want that.
///
/// Usage example:
/// ```
/// use oxigraph::store::Store;
@ -361,7 +348,10 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn remove<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
self.storage.remove(quad.into())
let mut writer = self.storage.atomic_writer();
let result = writer.remove(quad.into())?;
writer.commit()?;
Ok(result)
}
/// Dumps a store graph into a file.
@ -478,7 +468,10 @@ impl Store {
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
self.storage.insert_named_graph(graph_name.into())
let mut writer = self.storage.atomic_writer();
let result = writer.insert_named_graph(graph_name.into())?;
writer.commit()?;
Ok(result)
}
/// Clears a graph from this store.
@ -500,7 +493,9 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn clear_graph<'a>(&self, graph_name: impl Into<GraphNameRef<'a>>) -> io::Result<()> {
self.storage.clear_graph(graph_name.into())
let mut writer = self.storage.simple_writer();
writer.clear_graph(graph_name.into())?;
writer.commit()
}
/// Removes a graph from this store.
@ -527,7 +522,10 @@ impl Store {
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> {
self.storage.remove_named_graph(graph_name.into())
let mut writer = self.storage.simple_writer();
let result = writer.remove_named_graph(graph_name.into())?;
writer.commit()?;
Ok(result)
}
/// Clears the store.
@ -548,7 +546,9 @@ impl Store {
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn clear(&self) -> io::Result<()> {
self.storage.clear()
let mut writer = self.storage.simple_writer();
writer.clear()?;
writer.commit()
}
/// Flushes all buffers and ensures that all writes are saved on disk.
@ -599,14 +599,19 @@ impl Store {
/// Errors related to parameter validation like the base IRI use the [`InvalidInput`](std::io::ErrorKind::InvalidInput) error kind.
/// Errors related to a bad syntax in the loaded file use the [`InvalidData`](std::io::ErrorKind::InvalidData) or [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) error kinds.
/// Errors related to data loading into the store use the other error kinds.
#[cfg(not(target_arch = "wasm32"))]
pub fn create_from_dataset(
path: &Path,
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> io::Result<()> {
let storage = Storage::open(path.as_ref(), false)?;
load_dataset(&storage, reader, format, base_iri)?;
let storage = Storage::open(path, false)?;
{
let mut writer = storage.simple_writer();
load_dataset(&mut writer, reader, format, base_iri)?;
writer.commit()?;
}
storage.flush()?;
storage.compact()
}

Loading…
Cancel
Save