Moves back from Sled to RocksDB

Simple implementation without transactions
pull/171/head
Tpt 3 years ago
parent 2a82fe4f75
commit 063683088d
  1. 3
      .gitignore
  2. 2
      README.md
  3. 2
      lib/Cargo.toml
  4. 131
      lib/src/storage/fallback_backend.rs
  5. 732
      lib/src/storage/mod.rs
  6. 166
      lib/src/storage/rocksdb_backend.rs
  7. 113
      lib/src/storage/sled_backend.rs
  8. 263
      lib/src/store.rs
  9. BIN
      lib/tests/rocksdb_bc_data/000003.log
  10. 1
      lib/tests/rocksdb_bc_data/CURRENT
  11. 1
      lib/tests/rocksdb_bc_data/IDENTITY
  12. 0
      lib/tests/rocksdb_bc_data/LOCK
  13. BIN
      lib/tests/rocksdb_bc_data/MANIFEST-000004
  14. 964
      lib/tests/rocksdb_bc_data/OPTIONS-000026
  15. 4
      lib/tests/sled_bc_data/conf
  16. BIN
      lib/tests/sled_bc_data/db
  17. 39
      lib/tests/store.rs
  18. 2
      python/src/store.rs
  19. 2
      server/README.md

3
.gitignore vendored

@ -4,6 +4,5 @@ Cargo.lock
.idea
*.iml
js/node_modules
lib/tests/rockdb_bc_data
lib/tests/sled_bc_data
lib/tests/rocksdb_bc_data
venv

@ -10,7 +10,7 @@ Oxigraph
Oxigraph is a graph database implementing the [SPARQL](https://www.w3.org/TR/sparql11-overview/) standard.
Its goal is to provide a compliant, safe, and fast graph database based on the [Sled](https://sled.rs/) key-value store.
Its goal is to provide a compliant, safe, and fast graph database based on the [RocksDB](https://rocksdb.org/) key-value store.
It is written in Rust.
It also provides a set of utility functions for reading, writing, and processing RDF files.

@ -43,7 +43,7 @@ json-event-parser = "0.1"
spargebra = { version = "0.1", path="../spargebra", features = ["rdf-star"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
sled = "0.34"
rocksdb = { version = "0.17", default-features = false }
oxhttp = { version = "0.1", optional = true }
[target.'cfg(target_arch = "wasm32")'.dependencies]

@ -1,4 +1,3 @@
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::io::Result;
use std::sync::{Arc, Mutex, RwLock};
@ -10,21 +9,15 @@ pub struct Db {
}
impl Db {
pub fn new() -> Result<Self> {
pub fn new(_column_families: &[&str]) -> Result<Self> {
Ok(Self {
trees: Arc::default(),
default: Tree::new(),
default: Tree::default(),
})
}
pub fn open_tree(&self, name: &'static str) -> Result<Tree> {
Ok(self
.trees
.lock()
.unwrap()
.entry(name)
.or_insert_with(Tree::new)
.clone())
pub fn open_tree(&self, name: &'static str) -> Tree {
self.trees.lock().unwrap().entry(name).or_default().clone()
}
pub fn flush(&self) -> Result<()> {
@ -35,97 +28,35 @@ impl Db {
self.default.get(key)
}
pub fn insert(&self, key: &[u8], value: impl Into<Vec<u8>>) -> Result<bool> {
self.default.insert(key.into(), value)
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.default.insert(key, value)
}
}
#[derive(Clone)]
#[derive(Clone, Default)]
pub struct Tree {
tree: Arc<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
merge_operator: Arc<dyn Fn(&[u8], Option<&[u8]>, &[u8]) -> Option<Vec<u8>> + 'static>,
}
impl Tree {
fn new() -> Self {
Self {
tree: Arc::default(),
merge_operator: Arc::new(|_, _, v| Some(v.into())),
}
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(
self.tree.read().unwrap().get(key).map(|v| v.clone()), //TODO: avoid clone
)
Ok(self.tree.read().unwrap().get(key).map(|v| v.to_vec()))
}
pub fn contains_key(&self, key: &[u8]) -> Result<bool> {
Ok(self.tree.read().unwrap().contains_key(key.as_ref()))
}
pub fn insert(&self, key: &[u8], value: impl Into<Vec<u8>>) -> Result<bool> {
Ok(self
.tree
.write()
.unwrap()
.insert(key.into(), value.into())
.is_none())
}
pub fn insert_empty(&self, key: &[u8]) -> Result<bool> {
self.insert(key, [])
}
pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<()> {
match self.tree.write().unwrap().entry(key.into()) {
Entry::Occupied(e) => match (self.merge_operator)(key.as_ref(), Some(e.get()), value) {
Some(v) => {
*e.into_mut() = v;
}
None => {
e.remove();
}
},
Entry::Vacant(e) => {
if let Some(v) = (self.merge_operator)(key.as_ref(), None, value) {
e.insert(v);
}
}
}
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.tree.write().unwrap().insert(key.into(), value.into());
Ok(())
}
pub fn remove(&self, key: &[u8]) -> Result<bool> {
Ok(self.tree.write().unwrap().remove(key.as_ref()).is_some())
pub fn insert_empty(&self, key: &[u8]) -> Result<()> {
self.insert(key, &[])
}
pub fn update_and_fetch<V: Into<Vec<u8>>>(
&self,
key: &[u8],
mut f: impl FnMut(Option<&[u8]>) -> Option<V>,
) -> Result<Option<Vec<u8>>> {
Ok(match self.tree.write().unwrap().entry(key.into()) {
Entry::Occupied(e) => match f(Some(e.get())) {
Some(v) => {
let v = v.into();
let e_mut = e.into_mut();
e_mut.clear();
e_mut.extend_from_slice(&v);
Some(v)
}
None => {
e.remove();
None
}
},
Entry::Vacant(e) => match f(None) {
Some(v) => {
let v = v.into();
e.insert(v.clone());
Some(v)
}
None => None,
},
})
pub fn remove(&self, key: &[u8]) -> Result<bool> {
Ok(self.tree.write().unwrap().remove(key.as_ref()).is_some())
}
pub fn clear(&self) -> Result<()> {
@ -139,16 +70,16 @@ impl Tree {
pub fn scan_prefix(&self, prefix: &[u8]) -> Iter {
let tree = self.tree.read().unwrap();
let data: Vec<_> = if prefix.is_empty() {
tree.iter()
.map(|(k, v)| Ok((k.clone(), v.clone())))
.collect()
tree.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} else {
tree.range(prefix.to_vec()..)
.take_while(|(k, _)| k.starts_with(prefix))
.map(|(k, v)| Ok((k.clone(), v.clone())))
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
};
data.into_iter()
let mut iter = data.into_iter();
let current = iter.next();
Iter { iter, current }
}
pub fn len(&self) -> usize {
@ -158,17 +89,23 @@ impl Tree {
pub fn is_empty(&self) -> bool {
self.tree.read().unwrap().is_empty()
}
}
pub fn flush(&self) -> Result<()> {
Ok(())
pub struct Iter {
iter: std::vec::IntoIter<(Vec<u8>, Vec<u8>)>,
current: Option<(Vec<u8>, Vec<u8>)>,
}
impl Iter {
pub fn key(&self) -> Option<&[u8]> {
Some(&self.current.as_ref()?.0)
}
pub fn set_merge_operator(
&mut self,
merge_operator: impl Fn(&[u8], Option<&[u8]>, &[u8]) -> Option<Vec<u8>> + 'static,
) {
self.merge_operator = Arc::new(merge_operator)
pub fn value(&self) -> Option<&[u8]> {
Some(&self.current.as_ref()?.1)
}
}
pub type Iter = std::vec::IntoIter<Result<(Vec<u8>, Vec<u8>)>>;
pub fn next(&mut self) {
self.current = self.iter.next();
}
}

@ -1,29 +1,17 @@
use std::error::Error;
use std::fmt;
use std::path::Path;
#[cfg(not(target_arch = "wasm32"))]
use sled::transaction::{
ConflictableTransactionError as Sled2ConflictableTransactionError,
TransactionError as Sled2TransactionError, TransactionalTree,
UnabortableTransactionError as Sled2UnabortableTransactionError,
};
use crate::error::invalid_data_error;
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef};
use crate::sparql::EvaluationError;
use crate::storage::binary_encoder::{
decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple,
write_gosp_quad, write_gpos_quad, write_gspo_quad, write_osp_quad, write_ospg_quad,
write_pos_quad, write_posg_quad, write_spo_quad, write_spog_quad, write_term, QuadEncoding,
LATEST_STORAGE_VERSION, WRITTEN_TERM_MAX_SIZE,
};
use crate::storage::io::StoreOrParseError;
use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup, TermEncoder};
#[cfg(target_arch = "wasm32")]
use fallback_backend::{Db, Iter, Tree};
#[cfg(not(target_arch = "wasm32"))]
use sled_backend::{Db, Iter, Tree};
use rocksdb_backend::{Db, Iter, Tree};
use std::path::Path;
mod binary_encoder;
#[cfg(target_arch = "wasm32")]
@ -31,9 +19,26 @@ mod fallback_backend;
pub mod io;
pub mod numeric_encoder;
#[cfg(not(target_arch = "wasm32"))]
mod sled_backend;
mod rocksdb_backend;
pub mod small_string;
const ID2STR_CF: &str = "id2str";
const SPOG_CF: &str = "spog";
const POSG_CF: &str = "posg";
const OSPG_CF: &str = "ospg";
const GSPO_CF: &str = "gspo";
const GPOS_CF: &str = "gpos";
const GOSP_CF: &str = "gosp";
const DSPO_CF: &str = "dspo";
const DPOS_CF: &str = "dpos";
const DOSP_CF: &str = "dosp";
const GRAPHS_CF: &str = "graphs";
const COLUMN_FAMILIES: [&str; 11] = [
ID2STR_CF, SPOG_CF, POSG_CF, OSPG_CF, GSPO_CF, GPOS_CF, GOSP_CF, DSPO_CF, DPOS_CF, DOSP_CF,
GRAPHS_CF,
];
/// Low level storage primitives
#[derive(Clone)]
pub struct Storage {
@ -53,30 +58,27 @@ pub struct Storage {
impl Storage {
pub fn new() -> std::io::Result<Self> {
Self::setup(Db::new()?)
Self::setup(Db::new(&COLUMN_FAMILIES)?)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn open(path: &Path) -> std::io::Result<Self> {
Self::setup(Db::open(path)?)
Self::setup(Db::open(path, &COLUMN_FAMILIES)?)
}
fn setup(db: Db) -> std::io::Result<Self> {
let mut id2str = db.open_tree("id2str")?;
id2str.set_merge_operator(id2str_merge);
let this = Self {
id2str,
spog: db.open_tree("spog")?,
posg: db.open_tree("posg")?,
ospg: db.open_tree("ospg")?,
gspo: db.open_tree("gspo")?,
gpos: db.open_tree("gpos")?,
gosp: db.open_tree("gosp")?,
dspo: db.open_tree("dspo")?,
dpos: db.open_tree("dpos")?,
dosp: db.open_tree("dosp")?,
graphs: db.open_tree("graphs")?,
id2str: db.open_tree(ID2STR_CF),
spog: db.open_tree(SPOG_CF),
posg: db.open_tree(POSG_CF),
ospg: db.open_tree(OSPG_CF),
gspo: db.open_tree(GSPO_CF),
gpos: db.open_tree(GPOS_CF),
gosp: db.open_tree(GOSP_CF),
dspo: db.open_tree(DSPO_CF),
dpos: db.open_tree(DPOS_CF),
dosp: db.open_tree(DOSP_CF),
graphs: db.open_tree(GRAPHS_CF),
default: db,
};
@ -86,7 +88,7 @@ impl Storage {
for quad in this.quads() {
let quad = quad?;
if !quad.graph_name.is_default_graph() {
this.insert_encoded_named_graph(&quad.graph_name)?;
this.graphs.insert_empty(&encode_term(&quad.graph_name))?;
}
}
version = 1;
@ -95,26 +97,27 @@ impl Storage {
}
if version == 1 {
// We migrate to v2
for entry in this.id2str.iter() {
let (key, value) = entry?;
let mut iter = this.id2str.iter();
while let (Some(key), Some(value)) = (iter.key(), iter.value()) {
let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&u32::MAX.to_be_bytes());
new_value.extend_from_slice(&value);
this.id2str.insert(&key, new_value)?;
new_value.extend_from_slice(value);
this.id2str.insert(key, &new_value)?;
iter.next();
}
version = 2;
this.set_version(version)?;
this.id2str.flush()?;
this.default.flush()?;
}
match version {
_ if version < LATEST_STORAGE_VERSION => Err(invalid_data_error(format!(
"The Sled database is using the outdated encoding version {}. Automated migration is not supported, please dump the store dataset using a compatible Oxigraph version and load it again using the current version",
"The RocksDB database is using the outdated encoding version {}. Automated migration is not supported, please dump the store dataset using a compatible Oxigraph version and load it again using the current version",
version
))),
LATEST_STORAGE_VERSION => Ok(this),
_ => Err(invalid_data_error(format!(
"The Sled database is using the too recent version {}. Upgrade to the latest Oxigraph version to load this database",
"The RocksDB database is using the too recent version {}. Upgrade to the latest Oxigraph version to load this database",
version
)))
}
@ -132,50 +135,10 @@ impl Storage {
}
fn set_version(&self, version: u64) -> std::io::Result<()> {
self.default
.insert(b"oxversion", version.to_be_bytes().to_vec())?;
self.default.insert(b"oxversion", &version.to_be_bytes())?;
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn transaction<T, E>(
&self,
f: impl Fn(StorageTransaction<'_>) -> Result<T, ConflictableTransactionError<E>>,
) -> Result<T, TransactionError<E>> {
use sled::Transactional;
Ok((
self.id2str.as_sled(),
self.spog.as_sled(),
self.posg.as_sled(),
self.ospg.as_sled(),
self.gspo.as_sled(),
self.gpos.as_sled(),
self.gosp.as_sled(),
self.dspo.as_sled(),
self.dpos.as_sled(),
self.dosp.as_sled(),
self.graphs.as_sled(),
)
.transaction(
move |(id2str, spog, posg, ospg, gspo, gpos, gosp, dspo, dpos, dosp, graphs)| {
Ok(f(StorageTransaction {
id2str,
spog,
posg,
ospg,
gspo,
gpos,
gosp,
dspo,
dpos,
dosp,
graphs,
})?)
},
)?)
}
pub fn len(&self) -> usize {
self.gspo.len() + self.dspo.len()
}
@ -486,13 +449,15 @@ impl Storage {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
let encoded = quad.into();
if quad.graph_name.is_default_graph() {
Ok(if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, &encoded);
let is_new = self.dspo.insert_empty(buffer.as_slice())?;
if self.dspo.contains_key(buffer.as_slice())? {
false
} else {
self.insert_quad_triple(quad, &encoded)?;
if is_new {
self.dspo.insert_empty(buffer.as_slice())?;
buffer.clear();
self.insert_quad_triple(quad, &encoded)?;
write_pos_quad(&mut buffer, &encoded);
self.dpos.insert_empty(buffer.as_slice())?;
@ -501,16 +466,19 @@ impl Storage {
write_osp_quad(&mut buffer, &encoded);
self.dosp.insert_empty(buffer.as_slice())?;
buffer.clear();
}
Ok(is_new)
true
}
} else {
write_spog_quad(&mut buffer, &encoded);
let is_new = self.spog.insert_empty(buffer.as_slice())?;
if is_new {
buffer.clear();
if self.spog.contains_key(buffer.as_slice())? {
false
} else {
self.insert_quad_triple(quad, &encoded)?;
self.spog.insert_empty(buffer.as_slice())?;
buffer.clear();
write_posg_quad(&mut buffer, &encoded);
self.posg.insert_empty(buffer.as_slice())?;
buffer.clear();
@ -532,14 +500,15 @@ impl Storage {
buffer.clear();
write_term(&mut buffer, &encoded.graph_name);
if self.graphs.insert_empty(&buffer)? {
if !self.graphs.contains_key(&buffer)? {
self.graphs.insert_empty(&buffer)?;
self.insert_graph_name(quad.graph_name, &encoded.graph_name)?;
}
buffer.clear();
}
Ok(is_new)
}
true
}
})
}
pub fn remove(&self, quad: QuadRef<'_>) -> std::io::Result<bool> {
@ -549,11 +518,11 @@ impl Storage {
fn remove_encoded(&self, quad: &EncodedQuad) -> std::io::Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
if quad.graph_name.is_default_graph() {
Ok(if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad);
let is_present = self.dspo.remove(buffer.as_slice())?;
if is_present {
if self.dspo.contains_key(buffer.as_slice())? {
self.dspo.remove(buffer.as_slice())?;
buffer.clear();
write_pos_quad(&mut buffer, quad);
@ -565,14 +534,16 @@ impl Storage {
buffer.clear();
self.remove_quad_triple(quad)?;
}
Ok(is_present)
true
} else {
false
}
} else {
write_spog_quad(&mut buffer, quad);
let is_present = self.spog.remove(buffer.as_slice())?;
if is_present {
if self.spog.contains_key(buffer.as_slice())? {
self.spog.remove(buffer.as_slice())?;
buffer.clear();
write_posg_quad(&mut buffer, quad);
@ -596,26 +567,26 @@ impl Storage {
buffer.clear();
self.remove_quad_triple(quad)?;
}
Ok(is_present)
}
true
} else {
false
}
})
}
pub fn insert_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> std::io::Result<bool> {
let encoded = graph_name.into();
Ok(if self.insert_encoded_named_graph(&encoded)? {
self.insert_term(graph_name.into(), &encoded)?;
true
} else {
let encoded_graph_name = graph_name.into();
let encoded = encode_term(&encoded_graph_name);
Ok(if self.graphs.contains_key(&encoded)? {
false
} else {
self.graphs.insert_empty(&encoded)?;
self.insert_term(graph_name.into(), &encoded_graph_name)?;
true
})
}
fn insert_encoded_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result<bool> {
self.graphs.insert_empty(&encode_term(graph_name))
}
pub fn clear_graph(&self, graph_name: GraphNameRef<'_>) -> std::io::Result<()> {
for quad in self.quads_for_graph(&graph_name.into()) {
self.remove_encoded(&quad?)?;
@ -642,7 +613,9 @@ impl Storage {
for quad in self.quads_for_graph(&graph_name) {
self.remove_encoded(&quad?)?;
}
Ok(if self.graphs.remove(&encode_term(&graph_name))? {
let encoded_graph = encode_term(&graph_name);
Ok(if self.graphs.contains_key(&encoded_graph)? {
self.graphs.remove(&encoded_graph)?;
self.remove_term(&graph_name)?;
true
} else {
@ -682,12 +655,6 @@ impl Storage {
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn flush_async(&self) -> std::io::Result<()> {
self.default.flush_async().await?;
Ok(())
}
pub fn get_str(&self, key: &StrHash) -> std::io::Result<Option<String>> {
self.id2str
.get(&key.to_be_bytes())?
@ -745,10 +712,9 @@ impl Iterator for DecodingQuadIterator {
type Item = std::io::Result<EncodedQuad>;
fn next(&mut self) -> Option<std::io::Result<EncodedQuad>> {
Some(match self.iter.next()? {
Ok((encoded, _)) => self.encoding.decode(&encoded),
Err(error) => Err(error.into()),
})
let term = self.encoding.decode(self.iter.key()?);
self.iter.next();
Some(term)
}
}
@ -760,340 +726,51 @@ impl Iterator for DecodingGraphIterator {
type Item = std::io::Result<EncodedTerm>;
fn next(&mut self) -> Option<std::io::Result<EncodedTerm>> {
Some(match self.iter.next()? {
Ok((encoded, _)) => decode_term(&encoded),
Err(error) => Err(error.into()),
})
let term = decode_term(self.iter.key()?);
self.iter.next();
Some(term)
}
}
#[cfg(not(target_arch = "wasm32"))]
pub struct StorageTransaction<'a> {
id2str: &'a TransactionalTree,
spog: &'a TransactionalTree,
posg: &'a TransactionalTree,
ospg: &'a TransactionalTree,
gspo: &'a TransactionalTree,
gpos: &'a TransactionalTree,
gosp: &'a TransactionalTree,
dspo: &'a TransactionalTree,
dpos: &'a TransactionalTree,
dosp: &'a TransactionalTree,
graphs: &'a TransactionalTree,
}
#[cfg(not(target_arch = "wasm32"))]
impl<'a> StorageTransaction<'a> {
pub fn insert(&self, quad: QuadRef<'_>) -> Result<bool, UnabortableTransactionError> {
let encoded = quad.into();
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, &encoded);
let is_new = self.dspo.insert(buffer.as_slice(), &[])?.is_none();
if is_new {
buffer.clear();
self.insert_quad_triple(quad, &encoded)?;
write_pos_quad(&mut buffer, &encoded);
self.dpos.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_osp_quad(&mut buffer, &encoded);
self.dosp.insert(buffer.as_slice(), &[])?;
buffer.clear();
}
Ok(is_new)
} else {
write_spog_quad(&mut buffer, &encoded);
let is_new = self.spog.insert(buffer.as_slice(), &[])?.is_none();
if is_new {
buffer.clear();
self.insert_quad_triple(quad, &encoded)?;
write_posg_quad(&mut buffer, &encoded);
self.posg.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_ospg_quad(&mut buffer, &encoded);
self.ospg.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gspo_quad(&mut buffer, &encoded);
self.gspo.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gpos_quad(&mut buffer, &encoded);
self.gpos.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_gosp_quad(&mut buffer, &encoded);
self.gosp.insert(buffer.as_slice(), &[])?;
buffer.clear();
write_term(&mut buffer, &encoded.graph_name);
if self.graphs.insert(buffer.as_slice(), &[])?.is_none() {
self.insert_graph_name(quad.graph_name, &encoded.graph_name)?;
}
buffer.clear();
}
Ok(is_new)
}
}
pub fn remove(&self, quad: QuadRef<'_>) -> Result<bool, UnabortableTransactionError> {
let quad = EncodedQuad::from(quad);
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, &quad);
let is_present = self.dspo.remove(buffer.as_slice())?.is_some();
if is_present {
buffer.clear();
write_pos_quad(&mut buffer, &quad);
self.dpos.remove(buffer.as_slice())?;
buffer.clear();
write_osp_quad(&mut buffer, &quad);
self.dosp.remove(buffer.as_slice())?;
buffer.clear();
self.remove_quad_triple(&quad)?;
}
impl TermEncoder for Storage {
type Error = std::io::Error;
Ok(is_present)
fn insert_str(&self, key: &StrHash, value: &str) -> std::io::Result<()> {
if let Some(value) = self.id2str.get(&key.to_be_bytes())? {
let mut value = value.to_vec();
let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?);
let new_number = number.saturating_add(1);
value[..4].copy_from_slice(&new_number.to_be_bytes());
self.id2str.insert(&key.to_be_bytes(), &value)?
} else {
write_spog_quad(&mut buffer, &quad);
let is_present = self.spog.remove(buffer.as_slice())?.is_some();
if is_present {
buffer.clear();
write_posg_quad(&mut buffer, &quad);
self.posg.remove(buffer.as_slice())?;
buffer.clear();
write_ospg_quad(&mut buffer, &quad);
self.ospg.remove(buffer.as_slice())?;
buffer.clear();
write_gspo_quad(&mut buffer, &quad);
self.gspo.remove(buffer.as_slice())?;
buffer.clear();
write_gpos_quad(&mut buffer, &quad);
self.gpos.remove(buffer.as_slice())?;
buffer.clear();
write_gosp_quad(&mut buffer, &quad);
self.gosp.remove(buffer.as_slice())?;
buffer.clear();
self.remove_quad_triple(&quad)?;
}
Ok(is_present)
let mut buffer = Vec::with_capacity(value.len() + 4);
buffer.extend_from_slice(&1_u32.to_be_bytes());
buffer.extend_from_slice(value.as_bytes());
self.id2str.insert(&key.to_be_bytes(), &buffer)?;
}
Ok(())
}
pub fn insert_named_graph(
&self,
graph_name: NamedOrBlankNodeRef<'_>,
) -> Result<bool, UnabortableTransactionError> {
let encoded = graph_name.into();
Ok(
if self.graphs.insert(encode_term(&encoded), &[])?.is_none() {
self.insert_term(graph_name.into(), &encoded)?;
true
fn remove_str(&self, key: &StrHash) -> std::io::Result<()> {
if let Some(value) = self.id2str.get(&key.to_be_bytes())? {
let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?);
let new_number = number.saturating_sub(1);
if new_number == 0 {
self.id2str.remove(&key.to_be_bytes())?;
} else {
false
},
)
}
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, UnabortableTransactionError> {
self.id2str
.get(key.to_be_bytes())?
.map(|v| String::from_utf8(v[4..].to_vec()))
.transpose()
.map_err(|e| UnabortableTransactionError::Storage(invalid_data_error(e)))
}
pub fn contains_str(&self, key: &StrHash) -> Result<bool, UnabortableTransactionError> {
Ok(self.id2str.get(key.to_be_bytes())?.is_some())
}
}
/// Error returned by a Sled transaction
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug)]
pub enum TransactionError<T> {
/// A failure returned by the API user that have aborted the transaction
Abort(T),
/// A storage related error
Storage(std::io::Error),
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: fmt::Display> fmt::Display for TransactionError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Abort(e) => e.fmt(f),
Self::Storage(e) => e.fmt(f),
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Error + 'static> Error for TransactionError<T> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Abort(e) => Some(e),
Self::Storage(e) => Some(e),
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T> From<Sled2TransactionError<T>> for TransactionError<T> {
fn from(e: Sled2TransactionError<T>) -> Self {
match e {
Sled2TransactionError::Abort(e) => Self::Abort(e),
Sled2TransactionError::Storage(e) => Self::Storage(e.into()),
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Into<Self>> From<TransactionError<T>> for std::io::Error {
fn from(e: TransactionError<T>) -> Self {
match e {
TransactionError::Abort(e) => e.into(),
TransactionError::Storage(e) => e,
}
}
}
/// An error returned from the transaction methods.
/// Should be returned as it is
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug)]
pub enum UnabortableTransactionError {
#[doc(hidden)]
Conflict,
/// A regular error
Storage(std::io::Error),
}
#[cfg(not(target_arch = "wasm32"))]
impl fmt::Display for UnabortableTransactionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Conflict => write!(f, "Transaction conflict"),
Self::Storage(e) => e.fmt(f),
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl Error for UnabortableTransactionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Storage(e) => Some(e),
_ => None,
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl From<UnabortableTransactionError> for EvaluationError {
fn from(e: UnabortableTransactionError) -> Self {
match e {
UnabortableTransactionError::Storage(e) => Self::Io(e),
UnabortableTransactionError::Conflict => Self::Conflict,
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl From<StoreOrParseError<Self>> for UnabortableTransactionError {
fn from(e: StoreOrParseError<Self>) -> Self {
match e {
StoreOrParseError::Store(e) => e,
StoreOrParseError::Parse(e) => Self::Storage(e),
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl From<Sled2UnabortableTransactionError> for UnabortableTransactionError {
fn from(e: Sled2UnabortableTransactionError) -> Self {
match e {
Sled2UnabortableTransactionError::Storage(e) => Self::Storage(e.into()),
Sled2UnabortableTransactionError::Conflict => Self::Conflict,
}
}
}
/// An error returned from the transaction closure
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug)]
pub enum ConflictableTransactionError<T> {
/// A failure returned by the user that will abort the transaction
Abort(T),
#[doc(hidden)]
Conflict,
/// A storage related error
Storage(std::io::Error),
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: fmt::Display> fmt::Display for ConflictableTransactionError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Conflict => write!(f, "Transaction conflict"),
Self::Storage(e) => e.fmt(f),
Self::Abort(e) => e.fmt(f),
}
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Error + 'static> Error for ConflictableTransactionError<T> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Abort(e) => Some(e),
Self::Storage(e) => Some(e),
_ => None,
let mut value = value.to_vec();
value[..4].copy_from_slice(&new_number.to_be_bytes());
self.id2str.insert(&key.to_be_bytes(), &value)?;
}
}
Ok(())
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T> From<UnabortableTransactionError> for ConflictableTransactionError<T> {
fn from(e: UnabortableTransactionError) -> Self {
match e {
UnabortableTransactionError::Storage(e) => Self::Storage(e),
UnabortableTransactionError::Conflict => Self::Conflict,
}
}
}
pub trait StorageLike: StrLookup {
fn insert(&self, quad: QuadRef<'_>) -> Result<bool, Self::Error>;
#[cfg(not(target_arch = "wasm32"))]
impl<T> From<ConflictableTransactionError<T>> for Sled2ConflictableTransactionError<T> {
fn from(e: ConflictableTransactionError<T>) -> Self {
match e {
ConflictableTransactionError::Abort(e) => Self::Abort(e),
ConflictableTransactionError::Conflict => Self::Conflict,
ConflictableTransactionError::Storage(e) => Self::Storage(e.into()),
}
}
fn remove(&self, quad: QuadRef<'_>) -> Result<bool, Self::Error>;
}
impl StrLookup for Storage {
@ -1108,90 +785,6 @@ impl StrLookup for Storage {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<'a> StrLookup for StorageTransaction<'a> {
type Error = UnabortableTransactionError;
fn get_str(&self, key: &StrHash) -> Result<Option<String>, UnabortableTransactionError> {
self.get_str(key)
}
fn contains_str(&self, key: &StrHash) -> Result<bool, UnabortableTransactionError> {
self.contains_str(key)
}
}
impl TermEncoder for Storage {
type Error = std::io::Error;
fn insert_str(&self, key: &StrHash, value: &str) -> std::io::Result<()> {
self.id2str.merge(&key.to_be_bytes(), value.as_bytes())?;
Ok(())
}
fn remove_str(&self, key: &StrHash) -> std::io::Result<()> {
self.id2str.update_and_fetch(&key.to_be_bytes(), |old| {
let old = old?;
match u32::from_be_bytes(old[..4].try_into().ok()?) {
0 | 1 => None,
u32::MAX => Some(old.to_vec()),
number => {
let mut value = old.to_vec();
value[..4].copy_from_slice(&(number - 1).to_be_bytes());
Some(value)
}
}
})?;
Ok(())
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<'a> TermEncoder for StorageTransaction<'a> {
type Error = UnabortableTransactionError;
fn insert_str(&self, key: &StrHash, value: &str) -> Result<(), UnabortableTransactionError> {
let new_value = if let Some(old) = self.id2str.get(key.to_be_bytes())? {
let mut new_value = old.to_vec();
let number = u32::from_be_bytes(new_value[..4].try_into().ok().unwrap_or_default());
new_value[..4].copy_from_slice(&number.saturating_add(1).to_be_bytes()); //TODO: check
new_value
} else {
let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&1_u32.to_be_bytes());
new_value.extend_from_slice(value.as_bytes());
new_value
};
self.id2str.insert(&key.to_be_bytes(), new_value)?;
Ok(())
}
fn remove_str(&self, key: &StrHash) -> Result<(), UnabortableTransactionError> {
if let Some(old) = self.id2str.get(key.to_be_bytes())? {
if let Ok(number) = old[..4].try_into() {
match u32::from_be_bytes(number) {
0 | 1 => {
self.id2str.remove(&key.to_be_bytes())?;
}
u32::MAX => (),
number => {
let mut value = old;
value[..4].copy_from_slice(&(number - 1).to_be_bytes());
self.id2str.insert(&key.to_be_bytes(), value)?;
}
}
}
}
Ok(())
}
}
pub trait StorageLike: StrLookup {
fn insert(&self, quad: QuadRef<'_>) -> Result<bool, Self::Error>;
fn remove(&self, quad: QuadRef<'_>) -> Result<bool, Self::Error>;
}
impl StorageLike for Storage {
fn insert(&self, quad: QuadRef<'_>) -> std::io::Result<bool> {
self.insert(quad)
@ -1202,35 +795,6 @@ impl StorageLike for Storage {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<'a> StorageLike for StorageTransaction<'a> {
fn insert(&self, quad: QuadRef<'_>) -> Result<bool, UnabortableTransactionError> {
self.insert(quad)
}
fn remove(&self, quad: QuadRef<'_>) -> Result<bool, UnabortableTransactionError> {
self.remove(quad)
}
}
fn id2str_merge(
_key: &[u8], // the key being merged
old_value: Option<&[u8]>, // the previous value, if one existed
merged_bytes: &[u8], // the new bytes being merged in
) -> Option<Vec<u8>> {
Some(if let Some(value) = old_value {
let mut value = value.to_vec();
let number = u32::from_be_bytes(value[..4].try_into().ok()?);
value[..4].copy_from_slice(&number.saturating_add(1).to_be_bytes()); //TODO: check
value
} else {
let mut value = Vec::with_capacity(merged_bytes.len() + 4);
value.extend_from_slice(&1_u32.to_be_bytes());
value.extend_from_slice(merged_bytes);
value
})
}
#[cfg(test)]
mod tests {
use super::*;
@ -1283,60 +847,4 @@ mod tests {
.is_none());
Ok(())
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn test_strings_removal_in_transaction() -> std::io::Result<()> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"),
NamedNodeRef::new_unchecked("http://example.com/g"),
);
let quad2 = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o2"),
NamedNodeRef::new_unchecked("http://example.com/g"),
);
let storage = Storage::new()?;
transac(&storage, |t| t.insert(quad))?;
transac(&storage, |t| t.insert(quad2))?;
transac(&storage, |t| t.remove(quad2))?;
assert!(storage
.get_str(&StrHash::new("http://example.com/s"))?
.is_some());
assert!(storage
.get_str(&StrHash::new("http://example.com/p"))?
.is_some());
assert!(storage
.get_str(&StrHash::new("http://example.com/o2"))?
.is_none());
transac(&storage, |t| t.remove(quad))?;
assert!(storage
.get_str(&StrHash::new("http://example.com/s"))?
.is_none());
assert!(storage
.get_str(&StrHash::new("http://example.com/p"))?
.is_none());
assert!(storage
.get_str(&StrHash::new("http://example.com/o"))?
.is_none());
assert!(storage
.get_str(&StrHash::new("http://example.com/g"))?
.is_some());
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
fn transac<T>(
storage: &Storage,
f: impl Fn(StorageTransaction<'_>) -> Result<T, UnabortableTransactionError>,
) -> Result<(), TransactionError<std::io::Error>> {
storage.transaction(|t| {
f(t)?;
Ok(())
})
}
}

@ -0,0 +1,166 @@
use rocksdb::{ColumnFamily, DBPinnableSlice, DBRawIterator, Env, Error, Options, WriteBatch, DB};
use std::env::temp_dir;
use std::io::{self, Result};
use std::mem::transmute;
use std::path::Path;
use std::sync::Arc;
#[derive(Clone)]
pub struct Db(Arc<DB>);
impl Db {
pub fn new(column_families: &[&str]) -> Result<Self> {
//TODO: temp dir should not be useful
let temp_dir = if cfg!(target_os = "linux") {
"/dev/shm/oxigraph-rocksdb".into()
} else {
temp_dir().join("oxigraph-rocksdb-in-memory")
};
Ok(Self(Arc::new(Self::do_open(
&temp_dir,
column_families,
true,
)?)))
}
pub fn open(path: &Path, column_families: &[&str]) -> Result<Self> {
Ok(Self(Arc::new(Self::do_open(path, column_families, false)?)))
}
fn do_open(path: &Path, column_families: &[&str], mem_env: bool) -> Result<DB> {
let mut options = Options::default();
options.create_if_missing(true);
options.create_missing_column_families(true);
if mem_env {
options.set_env(&Env::mem_env().map_err(map_err)?);
}
DB::open_cf(&options, path, column_families).map_err(map_err)
}
pub fn open_tree(&self, name: &'static str) -> Tree {
Tree {
db: self.0.clone(),
cf_name: name,
}
}
pub fn flush(&self) -> Result<()> {
self.0.flush().map_err(map_err)
}
pub fn get(&self, key: &[u8]) -> Result<Option<DBPinnableSlice<'_>>> {
self.0.get_pinned(key).map_err(map_err)
}
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.0.put(key, value).map_err(map_err)
}
}
#[derive(Clone)]
pub struct Tree {
db: Arc<DB>,
cf_name: &'static str,
}
impl Tree {
pub fn get(&self, key: &[u8]) -> Result<Option<DBPinnableSlice<'_>>> {
self.db.get_pinned_cf(self.get_cf(), key).map_err(map_err)
}
pub fn contains_key(&self, key: &[u8]) -> Result<bool> {
Ok(self.get(key)?.is_some()) //TODO: optimize
}
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.db.put_cf(self.get_cf(), key, value).map_err(map_err)
}
pub fn insert_empty(&self, key: &[u8]) -> Result<()> {
self.insert(key, &[])
}
pub fn remove(&self, key: &[u8]) -> Result<()> {
self.db.delete_cf(self.get_cf(), key).map_err(map_err)
}
pub fn clear(&self) -> Result<()> {
let mut batch = WriteBatch::default();
batch.delete_range_cf(self.get_cf(), [].as_ref(), [u8::MAX; 257].as_ref());
self.db.write(batch).map_err(map_err)
}
pub fn iter(&self) -> Iter {
self.scan_prefix(&[])
}
#[allow(unsafe_code)]
pub fn scan_prefix(&self, prefix: &[u8]) -> Iter {
let mut iter = self.db.raw_iterator_cf(self.get_cf());
iter.seek(&prefix);
// Safe because we clone the same DB from which we take an iterator
unsafe { Iter::new(iter, self.db.clone(), prefix.into()) }
}
pub fn len(&self) -> usize {
let mut count = 0;
let mut iter = self.iter();
while iter.is_valid() {
count += 1;
iter.next();
}
count
}
pub fn is_empty(&self) -> bool {
self.iter().key().is_none()
}
#[allow(clippy::expect_used)]
fn get_cf(&self) -> &ColumnFamily {
self.db
.cf_handle(self.cf_name)
.expect("A column family that should exist in RocksDB does not exist")
}
}
pub struct Iter {
iter: DBRawIterator<'static>,
prefix: Vec<u8>,
_db: Arc<DB>, // needed to ensure that DB still lives while iter is used
}
impl Iter {
/// Creates a static iterator from a non static one by keeping a ARC reference to the database
/// Caller must ensure that the iterator belongs to the same database
///
/// This unsafe method is required to get static iterators and ease the usage of the library.
#[allow(unsafe_code, clippy::useless_transmute)]
unsafe fn new(iter: DBRawIterator<'_>, db: Arc<DB>, prefix: Vec<u8>) -> Self {
Self {
iter: transmute(iter),
prefix,
_db: db,
}
}
pub fn is_valid(&self) -> bool {
self.iter.valid()
}
pub fn key(&self) -> Option<&[u8]> {
self.iter.key().filter(|k| k.starts_with(&self.prefix))
}
pub fn value(&self) -> Option<&[u8]> {
self.iter.value()
}
pub fn next(&mut self) {
self.iter.next()
}
}
fn map_err(e: Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}

@ -1,113 +0,0 @@
use std::io::Result;
use std::path::Path;
#[derive(Clone)]
pub struct Db(sled::Db);
impl Db {
pub fn new() -> Result<Self> {
Ok(Self(sled::Config::new().temporary(true).open()?))
}
pub fn open(path: &Path) -> Result<Self> {
Ok(Self(sled::Config::new().path(path).open()?))
}
pub fn open_tree(&self, name: &'static str) -> Result<Tree> {
Ok(Tree(self.0.open_tree(name)?))
}
pub fn flush(&self) -> Result<()> {
self.0.flush()?;
Ok(())
}
pub async fn flush_async(&self) -> Result<()> {
self.0.flush_async().await?;
Ok(())
}
pub fn get(&self, key: &[u8]) -> Result<Option<sled::IVec>> {
Ok(self.0.get(key)?)
}
pub fn insert(&self, key: &[u8], value: impl Into<sled::IVec>) -> Result<bool> {
Ok(self.0.insert(key, value)?.is_none())
}
}
#[derive(Clone)]
pub struct Tree(sled::Tree);
impl Tree {
pub fn get(&self, key: &[u8]) -> Result<Option<sled::IVec>> {
Ok(self.0.get(key)?)
}
pub fn contains_key(&self, key: &[u8]) -> Result<bool> {
Ok(self.0.contains_key(key)?)
}
pub fn insert(&self, key: &[u8], value: impl Into<sled::IVec>) -> Result<bool> {
Ok(self.0.insert(key, value)?.is_none())
}
pub fn insert_empty(&self, key: &[u8]) -> Result<bool> {
self.insert(key, &[])
}
pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.0.merge(key, value)?;
Ok(())
}
pub fn remove(&self, key: &[u8]) -> Result<bool> {
Ok(self.0.remove(key)?.is_some())
}
pub fn update_and_fetch<V: Into<sled::IVec>>(
&self,
key: &[u8],
f: impl FnMut(Option<&[u8]>) -> Option<V>,
) -> Result<Option<sled::IVec>> {
Ok(self.0.update_and_fetch(key, f)?)
}
pub fn clear(&self) -> Result<()> {
Ok(self.0.clear()?)
}
pub fn iter(&self) -> sled::Iter {
self.0.iter()
}
pub fn scan_prefix(&self, prefix: &[u8]) -> sled::Iter {
self.0.scan_prefix(prefix)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn flush(&self) -> Result<()> {
self.0.flush()?;
Ok(())
}
pub fn set_merge_operator(
&mut self,
merge_operator: impl Fn(&[u8], Option<&[u8]>, &[u8]) -> Option<Vec<u8>> + 'static,
) {
self.0.set_merge_operator(merge_operator)
}
pub fn as_sled(&self) -> &sled::Tree {
&self.0
}
}
pub type Iter = sled::Iter;

@ -23,7 +23,6 @@
//! };
//! # Result::<_,Box<dyn std::error::Error>>::Ok(())
//! ```
use crate::io::{DatasetFormat, GraphFormat};
use crate::model::*;
use crate::sparql::{
@ -32,22 +31,14 @@ use crate::sparql::{
};
use crate::storage::io::{dump_dataset, dump_graph, load_dataset, load_graph};
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
#[cfg(not(target_arch = "wasm32"))]
use crate::storage::StorageTransaction;
use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage};
#[cfg(not(target_arch = "wasm32"))]
pub use crate::storage::{
ConflictableTransactionError, TransactionError, UnabortableTransactionError,
};
use std::io::{BufRead, Write};
use std::path::Path;
use std::{fmt, io, str};
/// An on-on disk [RDF dataset](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset).
/// Allows to query and update it using SPARQL.
/// It is based on the [Sled](https://sled.rs/) key-value database.
///
/// Warning: Sled is not stable yet and might break its storage format.
/// It is based on the [RocksDB](https://rocksdb.org/) key-value store.
///
/// Usage example:
/// ```
@ -236,41 +227,6 @@ impl Store {
)
}
/// Executes an ACID transaction.
///
/// The transaction is executed if the given closure returns `Ok`.
/// The transaction is rollbacked if the closure returns `Err`.
///
/// Usage example:
/// ```
/// use oxigraph::store::{ConflictableTransactionError, Store};
/// use oxigraph::model::*;
/// use std::convert::Infallible;
///
/// let store = Store::new()?;
///
/// let ex = NamedNodeRef::new("http://example.com")?;
/// let quad = QuadRef::new(ex, ex, ex, ex);
///
/// // transaction
/// store.transaction(|transaction| {
/// transaction.insert(quad)?;
/// Ok(()) as Result<(),ConflictableTransactionError<Infallible>>
/// })?;
///
/// assert!(store.contains(quad)?);
/// assert!(store.contains_named_graph(ex)?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
#[cfg(not(target_arch = "wasm32"))]
pub fn transaction<T, E>(
&self,
f: impl Fn(Transaction<'_>) -> Result<T, ConflictableTransactionError<E>>,
) -> Result<T, TransactionError<E>> {
self.storage
.transaction(|storage| f(Transaction { storage }))
}
/// Loads a graph file (i.e. triples) into the store
///
/// Warning: This functions saves the triples in a not atomic way.
@ -604,17 +560,6 @@ impl Store {
pub fn flush(&self) -> io::Result<()> {
self.storage.flush()
}
/// Asynchronously flushes all buffers and ensures that all writes are saved on disk.
///
/// Flushes are automatically done for most platform using background threads.
/// However, calling this method explicitly is still required for Windows and Android.
///
/// A [sync version](SledStore::flush) is also available.
#[cfg(not(target_arch = "wasm32"))]
pub async fn flush_async(&self) -> io::Result<()> {
self.storage.flush_async().await
}
}
impl fmt::Display for Store {
@ -626,195 +571,6 @@ impl fmt::Display for Store {
}
}
/// Allows inserting and deleting quads during an ACID transaction with the [`Store`].
#[cfg(not(target_arch = "wasm32"))]
pub struct Transaction<'a> {
storage: StorageTransaction<'a>,
}
#[cfg(not(target_arch = "wasm32"))]
impl Transaction<'_> {
/// Loads a graph file (i.e. triples) into the store during the transaction.
///
/// Warning: Because the load happens during a transaction,
/// the full file content might be temporarily stored in main memory.
/// Do not use for big files.
///
/// Usage example:
/// ```
/// use oxigraph::store::Store;
/// use oxigraph::io::GraphFormat;
/// use oxigraph::model::*;
/// use oxigraph::store::ConflictableTransactionError;
///
/// let store = Store::new()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> .";
/// store.transaction(|transaction| {
/// transaction.load_graph(file.as_ref(), GraphFormat::NTriples, &GraphName::DefaultGraph, None)?;
/// Ok(()) as Result<(),ConflictableTransactionError<std::io::Error>>
/// })?;
///
/// // we inspect the store content
/// let ex = NamedNodeRef::new("http://example.com")?;
/// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ```
///
/// If the file parsing fails in the middle of the file, the triples read before are still
/// considered by the transaction. Rollback the transaction by making the transaction closure
/// return an error if you don't want that.
/// Moving up the parsing error through the transaction is enough to do that.
///
/// Errors related to parameter validation like the base IRI use the [`InvalidInput`](std::io::ErrorKind::InvalidInput) error kind.
/// Errors related to a bad syntax in the loaded file use the [`InvalidData`](std::io::ErrorKind::InvalidData) or [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) error kinds.
pub fn load_graph<'a>(
&self,
reader: impl BufRead,
format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>,
) -> Result<(), UnabortableTransactionError> {
load_graph(
&self.storage,
reader,
format,
to_graph_name.into(),
base_iri,
)?;
Ok(())
}
/// Loads a dataset file (i.e. quads) into the store. into the store during the transaction.
///
/// Warning: Because the load happens during a transaction,
/// the full file content might be temporarily stored in main memory.
/// Do not use for big files.
///
/// Usage example:
/// ```
/// use oxigraph::store::{Store, ConflictableTransactionError};
/// use oxigraph::io::DatasetFormat;
/// use oxigraph::model::*;
///
/// let store = Store::new()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
/// store.transaction(|transaction| {
/// transaction.load_dataset(file.as_ref(), DatasetFormat::NQuads, None)?;
/// Ok(()) as Result<(),ConflictableTransactionError<std::io::Error>>
/// })?;
///
/// // we inspect the store content
/// let ex = NamedNodeRef::new("http://example.com")?;
/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ```
///
/// If the file parsing fails in the middle of the file, the quads read before are still
/// considered by the transaction. Rollback the transaction by making the transaction closure
/// return an error if you don't want that.
/// Moving up the parsing error through the transaction is enough to do that.
///
/// Errors related to parameter validation like the base IRI use the [`InvalidInput`](std::io::ErrorKind::InvalidInput) error kind.
/// Errors related to a bad syntax in the loaded file use the [`InvalidData`](std::io::ErrorKind::InvalidData) or [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) error kinds.
pub fn load_dataset(
&self,
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> Result<(), UnabortableTransactionError> {
Ok(load_dataset(&self.storage, reader, format, base_iri)?)
}
/// Adds a quad to this store during the transaction.
///
/// Returns `true` if the quad was not already in the store.
///
/// Usage example:
/// ```
/// use oxigraph::store::{Store, ConflictableTransactionError};
/// use oxigraph::model::*;
///
/// let ex = NamedNodeRef::new("http://example.com")?;
/// let quad = QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph);
///
/// let store = Store::new()?;
/// store.transaction(|transaction| {
/// transaction.insert(quad)?;
/// Ok(()) as Result<(),ConflictableTransactionError<std::io::Error>>
/// })?;
///
/// assert!(store.contains(quad)?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn insert<'a>(
&self,
quad: impl Into<QuadRef<'a>>,
) -> Result<bool, UnabortableTransactionError> {
self.storage.insert(quad.into())
}
/// Removes a quad from this store during the transaction.
///
/// Returns `true` if the quad was in the store and has been removed.
///
/// Usage example:
/// ```
/// use oxigraph::store::{Store, ConflictableTransactionError};
/// use oxigraph::model::*;
///
/// let ex = NamedNodeRef::new("http://example.com")?;
/// let quad = QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph);
///
/// let store = Store::new()?;
/// store.insert(quad)?;
///
/// store.transaction(|transaction| {
/// transaction.remove(quad)?;
/// Ok(()) as Result<(),ConflictableTransactionError<std::io::Error>>
/// })?;
///
/// assert!(!store.contains(quad)?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn remove<'a>(
&self,
quad: impl Into<QuadRef<'a>>,
) -> Result<bool, UnabortableTransactionError> {
self.storage.remove(quad.into())
}
/// Inserts a graph into this store during the transaction
///
/// Returns `true` if the graph was not already in the store.
///
/// Usage example:
/// ```
/// use oxigraph::store::{Store, ConflictableTransactionError};
/// use oxigraph::model::*;
///
/// let ex = NamedNodeRef::new("http://example.com")?;
///
/// let store = Store::new()?;
/// store.transaction(|transaction| {
/// transaction.insert_named_graph(ex)?;
/// Ok(()) as Result<(),ConflictableTransactionError<std::io::Error>>
/// })?;
///
/// assert_eq!(store.named_graphs().collect::<Result<Vec<_>,_>>()?, vec![ex.into_owned().into()]);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
pub fn insert_named_graph<'a>(
&self,
graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> Result<bool, UnabortableTransactionError> {
self.storage.insert_named_graph(graph_name.into())
}
}
/// An iterator returning the quads contained in a [`Store`].
pub struct QuadIter {
iter: ChainedDecodingQuadIterator,
@ -855,7 +611,6 @@ impl Iterator for GraphNameIter {
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn store() -> io::Result<()> {
use crate::model::*;
@ -912,17 +667,13 @@ fn store() -> io::Result<()> {
for t in &default_quads {
assert!(store.insert(t)?);
}
assert!(!store.insert(&default_quad)?);
let result: Result<_, TransactionError<io::Error>> = store.transaction(|t| {
assert!(t.remove(&default_quad)?);
assert!(!t.remove(&default_quad)?);
assert!(t.insert(&named_quad)?);
assert!(!t.insert(&named_quad)?);
assert!(t.insert(&default_quad)?);
assert!(!t.insert(&default_quad)?);
Ok(())
});
result?;
assert!(store.remove(&default_quad)?);
assert!(!store.remove(&default_quad)?);
assert!(store.insert(&named_quad)?);
assert!(!store.insert(&named_quad)?);
assert!(store.insert(&default_quad)?);
assert!(!store.insert(&default_quad)?);
assert_eq!(store.len(), 4);

@ -0,0 +1 @@
f08a4c5b-0479-408c-80d3-7d4b10d7c7aa

@ -0,0 +1,964 @@
# This is a RocksDB option file.
#
# For detailed file format spec, please refer to the example file
# in examples/rocksdb_option_file_example.ini
#
[Version]
rocksdb_version=6.7.3
options_file_version=1.1
[DBOptions]
write_dbid_to_manifest=false
avoid_unnecessary_blocking_io=false
two_write_queues=false
allow_ingest_behind=false
writable_file_max_buffer_size=1048576
avoid_flush_during_shutdown=false
avoid_flush_during_recovery=false
info_log_level=INFO_LEVEL
access_hint_on_compaction_start=NORMAL
allow_concurrent_memtable_write=true
enable_pipelined_write=false
stats_dump_period_sec=600
stats_persist_period_sec=600
strict_bytes_per_sync=false
WAL_ttl_seconds=0
WAL_size_limit_MB=0
max_subcompactions=1
dump_malloc_stats=false
db_log_dir=
wal_recovery_mode=kPointInTimeRecovery
log_file_time_to_roll=0
enable_write_thread_adaptive_yield=true
recycle_log_file_num=0
table_cache_numshardbits=6
atomic_flush=false
preserve_deletes=false
stats_history_buffer_size=1048576
max_open_files=-1
max_file_opening_threads=16
delete_obsolete_files_period_micros=21600000000
max_background_flushes=-1
write_thread_slow_yield_usec=3
base_background_compactions=-1
manual_wal_flush=false
wal_dir=tests/rockdb_bc_data
max_background_compactions=-1
bytes_per_sync=0
max_background_jobs=2
use_fsync=false
unordered_write=false
fail_if_options_file_error=false
random_access_max_buffer_size=1048576
compaction_readahead_size=0
wal_bytes_per_sync=0
new_table_reader_for_compaction_inputs=false
skip_stats_update_on_db_open=false
persist_stats_to_disk=false
skip_log_error_on_recovery=false
log_readahead_size=0
is_fd_close_on_exec=true
use_adaptive_mutex=false
error_if_exists=false
write_thread_max_yield_usec=100
enable_thread_tracking=false
db_write_buffer_size=0
create_missing_column_families=true
paranoid_checks=true
create_if_missing=true
max_manifest_file_size=1073741824
allow_2pc=false
max_total_wal_size=0
use_direct_io_for_flush_and_compaction=false
manifest_preallocation_size=4194304
use_direct_reads=false
delayed_write_rate=16777216
allow_fallocate=true
max_write_batch_group_size_bytes=1048576
keep_log_file_num=1000
allow_mmap_reads=false
max_log_file_size=0
allow_mmap_writes=false
advise_random_on_open=true
[CFOptions "default"]
sample_for_compression=0
compaction_pri=kMinOverlappingRatio
merge_operator=nullptr
compaction_filter_factory=nullptr
memtable_factory=SkipListFactory
memtable_insert_with_hint_prefix_extractor=nullptr
comparator=leveldb.BytewiseComparator
target_file_size_base=67108864
max_sequential_skip_in_iterations=8
compaction_style=kCompactionStyleLevel
max_bytes_for_level_base=268435456
bloom_locality=0
write_buffer_size=67108864
compression_per_level=
memtable_huge_page_size=0
max_successive_merges=0
arena_block_size=8388608
memtable_whole_key_filtering=false
target_file_size_multiplier=1
max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
num_levels=7
min_write_buffer_number_to_merge=1
max_write_buffer_number_to_maintain=0
max_write_buffer_number=2
compression=kSnappyCompression
level0_stop_writes_trigger=36
level0_slowdown_writes_trigger=20
compaction_filter=nullptr
level0_file_num_compaction_trigger=4
max_compaction_bytes=1677721600
compaction_options_universal={stop_style=kCompactionStopStyleTotalSize;compression_size_percent=-1;allow_trivial_move=false;max_merge_width=4294967295;max_size_amplification_percent=200;min_merge_width=2;size_ratio=1;}
memtable_prefix_bloom_size_ratio=0.000000
max_write_buffer_size_to_maintain=0
hard_pending_compaction_bytes_limit=274877906944
ttl=2592000
table_factory=BlockBasedTable
soft_pending_compaction_bytes_limit=68719476736
prefix_extractor=nullptr
bottommost_compression=kDisableCompressionOption
force_consistency_checks=false
paranoid_file_checks=false
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;}
max_bytes_for_level_multiplier=10.000000
optimize_filters_for_hits=false
level_compaction_dynamic_level_bytes=false
inplace_update_num_locks=10000
inplace_update_support=false
periodic_compaction_seconds=0
disable_auto_compactions=false
report_bg_io_stats=false
[TableOptions/BlockBasedTable "default"]
pin_top_level_index_and_filter=true
enable_index_compression=true
read_amp_bytes_per_bit=8589934592
format_version=2
block_align=false
metadata_block_size=4096
block_size_deviation=10
partition_filters=false
block_size=4096
index_block_restart_interval=1
no_block_cache=false
checksum=kCRC32c
whole_key_filtering=true
index_shortening=kShortenSeparators
data_block_index_type=kDataBlockBinarySearch
index_type=kBinarySearch
verify_compression=false
filter_policy=nullptr
data_block_hash_table_util_ratio=0.750000
pin_l0_filter_and_index_blocks_in_cache=false
block_restart_interval=16
cache_index_and_filter_blocks_with_high_priority=true
cache_index_and_filter_blocks=false
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
[CFOptions "id2str"]
sample_for_compression=0
compaction_pri=kMinOverlappingRatio
merge_operator=nullptr
compaction_filter_factory=nullptr
memtable_factory=SkipListFactory
memtable_insert_with_hint_prefix_extractor=nullptr
comparator=leveldb.BytewiseComparator
target_file_size_base=67108864
max_sequential_skip_in_iterations=8
compaction_style=kCompactionStyleLevel
max_bytes_for_level_base=268435456
bloom_locality=0
write_buffer_size=67108864
compression_per_level=
memtable_huge_page_size=0
max_successive_merges=0
arena_block_size=8388608
memtable_whole_key_filtering=false
target_file_size_multiplier=1
max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
num_levels=7
min_write_buffer_number_to_merge=1
max_write_buffer_number_to_maintain=0
max_write_buffer_number=2
compression=kSnappyCompression
level0_stop_writes_trigger=36
level0_slowdown_writes_trigger=20
compaction_filter=nullptr
level0_file_num_compaction_trigger=4
max_compaction_bytes=1677721600
compaction_options_universal={stop_style=kCompactionStopStyleTotalSize;compression_size_percent=-1;allow_trivial_move=false;max_merge_width=4294967295;max_size_amplification_percent=200;min_merge_width=2;size_ratio=1;}
memtable_prefix_bloom_size_ratio=0.000000
max_write_buffer_size_to_maintain=0
hard_pending_compaction_bytes_limit=274877906944
ttl=2592000
table_factory=BlockBasedTable
soft_pending_compaction_bytes_limit=68719476736
prefix_extractor=nullptr
bottommost_compression=kDisableCompressionOption
force_consistency_checks=false
paranoid_file_checks=false
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;}
max_bytes_for_level_multiplier=10.000000
optimize_filters_for_hits=false
level_compaction_dynamic_level_bytes=false
inplace_update_num_locks=10000
inplace_update_support=false
periodic_compaction_seconds=0
disable_auto_compactions=false
report_bg_io_stats=false
[TableOptions/BlockBasedTable "id2str"]
pin_top_level_index_and_filter=true
enable_index_compression=true
read_amp_bytes_per_bit=8589934592
format_version=2
block_align=false
metadata_block_size=4096
block_size_deviation=10
partition_filters=false
block_size=4096
index_block_restart_interval=1
no_block_cache=false
checksum=kCRC32c
whole_key_filtering=true
index_shortening=kShortenSeparators
data_block_index_type=kDataBlockBinarySearch
index_type=kBinarySearch
verify_compression=false
filter_policy=nullptr
data_block_hash_table_util_ratio=0.750000
pin_l0_filter_and_index_blocks_in_cache=false
block_restart_interval=16
cache_index_and_filter_blocks_with_high_priority=true
cache_index_and_filter_blocks=false
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
[CFOptions "spog"]
sample_for_compression=0
compaction_pri=kMinOverlappingRatio
merge_operator=nullptr
compaction_filter_factory=nullptr
memtable_factory=SkipListFactory
memtable_insert_with_hint_prefix_extractor=nullptr
comparator=leveldb.BytewiseComparator
target_file_size_base=67108864
max_sequential_skip_in_iterations=8
compaction_style=kCompactionStyleLevel
max_bytes_for_level_base=268435456
bloom_locality=0
write_buffer_size=67108864
compression_per_level=
memtable_huge_page_size=0
max_successive_merges=0
arena_block_size=8388608
memtable_whole_key_filtering=false
target_file_size_multiplier=1
max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
num_levels=7
min_write_buffer_number_to_merge=1
max_write_buffer_number_to_maintain=0
max_write_buffer_number=2
compression=kSnappyCompression
level0_stop_writes_trigger=36
level0_slowdown_writes_trigger=20
compaction_filter=nullptr
level0_file_num_compaction_trigger=4
max_compaction_bytes=1677721600
compaction_options_universal={stop_style=kCompactionStopStyleTotalSize;compression_size_percent=-1;allow_trivial_move=false;max_merge_width=4294967295;max_size_amplification_percent=200;min_merge_width=2;size_ratio=1;}
memtable_prefix_bloom_size_ratio=0.000000
max_write_buffer_size_to_maintain=0
hard_pending_compaction_bytes_limit=274877906944
ttl=2592000
table_factory=BlockBasedTable
soft_pending_compaction_bytes_limit=68719476736
prefix_extractor=nullptr
bottommost_compression=kDisableCompressionOption
force_consistency_checks=false
paranoid_file_checks=false
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;}
max_bytes_for_level_multiplier=10.000000
optimize_filters_for_hits=false
level_compaction_dynamic_level_bytes=false
inplace_update_num_locks=10000
inplace_update_support=false
periodic_compaction_seconds=0
disable_auto_compactions=false
report_bg_io_stats=false
[TableOptions/BlockBasedTable "spog"]
pin_top_level_index_and_filter=true
enable_index_compression=true
read_amp_bytes_per_bit=8589934592
format_version=2
block_align=false
metadata_block_size=4096
block_size_deviation=10
partition_filters=false
block_size=4096
index_block_restart_interval=1
no_block_cache=false
checksum=kCRC32c
whole_key_filtering=true
index_shortening=kShortenSeparators
data_block_index_type=kDataBlockBinarySearch
index_type=kBinarySearch
verify_compression=false
filter_policy=nullptr
data_block_hash_table_util_ratio=0.750000
pin_l0_filter_and_index_blocks_in_cache=false
block_restart_interval=16
cache_index_and_filter_blocks_with_high_priority=true
cache_index_and_filter_blocks=false
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
[CFOptions "posg"]
sample_for_compression=0
compaction_pri=kMinOverlappingRatio
merge_operator=nullptr
compaction_filter_factory=nullptr
memtable_factory=SkipListFactory
memtable_insert_with_hint_prefix_extractor=nullptr
comparator=leveldb.BytewiseComparator
target_file_size_base=67108864
max_sequential_skip_in_iterations=8
compaction_style=kCompactionStyleLevel
max_bytes_for_level_base=268435456
bloom_locality=0
write_buffer_size=67108864
compression_per_level=
memtable_huge_page_size=0
max_successive_merges=0
arena_block_size=8388608
memtable_whole_key_filtering=false
target_file_size_multiplier=1
max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
num_levels=7
min_write_buffer_number_to_merge=1
max_write_buffer_number_to_maintain=0
max_write_buffer_number=2
compression=kSnappyCompression
level0_stop_writes_trigger=36
level0_slowdown_writes_trigger=20
compaction_filter=nullptr
level0_file_num_compaction_trigger=4
max_compaction_bytes=1677721600
compaction_options_universal={stop_style=kCompactionStopStyleTotalSize;compression_size_percent=-1;allow_trivial_move=false;max_merge_width=4294967295;max_size_amplification_percent=200;min_merge_width=2;size_ratio=1;}
memtable_prefix_bloom_size_ratio=0.000000
max_write_buffer_size_to_maintain=0
hard_pending_compaction_bytes_limit=274877906944
ttl=2592000
table_factory=BlockBasedTable
soft_pending_compaction_bytes_limit=68719476736
prefix_extractor=nullptr
bottommost_compression=kDisableCompressionOption
force_consistency_checks=false
paranoid_file_checks=false
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;}
max_bytes_for_level_multiplier=10.000000
optimize_filters_for_hits=false
level_compaction_dynamic_level_bytes=false
inplace_update_num_locks=10000
inplace_update_support=false
periodic_compaction_seconds=0
disable_auto_compactions=false
report_bg_io_stats=false
[TableOptions/BlockBasedTable "posg"]
pin_top_level_index_and_filter=true
enable_index_compression=true
read_amp_bytes_per_bit=8589934592
format_version=2
block_align=false
metadata_block_size=4096
block_size_deviation=10
partition_filters=false
block_size=4096
index_block_restart_interval=1
no_block_cache=false
checksum=kCRC32c
whole_key_filtering=true
index_shortening=kShortenSeparators
data_block_index_type=kDataBlockBinarySearch
index_type=kBinarySearch
verify_compression=false
filter_policy=nullptr
data_block_hash_table_util_ratio=0.750000
pin_l0_filter_and_index_blocks_in_cache=false
block_restart_interval=16
cache_index_and_filter_blocks_with_high_priority=true
cache_index_and_filter_blocks=false
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
[CFOptions "ospg"]
sample_for_compression=0
compaction_pri=kMinOverlappingRatio
merge_operator=nullptr
compaction_filter_factory=nullptr
memtable_factory=SkipListFactory
memtable_insert_with_hint_prefix_extractor=nullptr
comparator=leveldb.BytewiseComparator
target_file_size_base=67108864
max_sequential_skip_in_iterations=8
compaction_style=kCompactionStyleLevel
max_bytes_for_level_base=268435456
bloom_locality=0
write_buffer_size=67108864
compression_per_level=
memtable_huge_page_size=0
max_successive_merges=0
arena_block_size=8388608
memtable_whole_key_filtering=false
target_file_size_multiplier=1
max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
num_levels=7
min_write_buffer_number_to_merge=1
max_write_buffer_number_to_maintain=0
max_write_buffer_number=2
compression=kSnappyCompression
level0_stop_writes_trigger=36
level0_slowdown_writes_trigger=20
compaction_filter=nullptr
level0_file_num_compaction_trigger=4
max_compaction_bytes=1677721600
compaction_options_universal={stop_style=kCompactionStopStyleTotalSize;compression_size_percent=-1;allow_trivial_move=false;max_merge_width=4294967295;max_size_amplification_percent=200;min_merge_width=2;size_ratio=1;}
memtable_prefix_bloom_size_ratio=0.000000
max_write_buffer_size_to_maintain=0
hard_pending_compaction_bytes_limit=274877906944
ttl=2592000
table_factory=BlockBasedTable
soft_pending_compaction_bytes_limit=68719476736
prefix_extractor=nullptr
bottommost_compression=kDisableCompressionOption
force_consistency_checks=false
paranoid_file_checks=false
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;}
max_bytes_for_level_multiplier=10.000000
optimize_filters_for_hits=false
level_compaction_dynamic_level_bytes=false
inplace_update_num_locks=10000
inplace_update_support=false
periodic_compaction_seconds=0
disable_auto_compactions=false
report_bg_io_stats=false
[TableOptions/BlockBasedTable "ospg"]
pin_top_level_index_and_filter=true
enable_index_compression=true
read_amp_bytes_per_bit=8589934592
format_version=2
block_align=false
metadata_block_size=4096
block_size_deviation=10
partition_filters=false
block_size=4096
index_block_restart_interval=1
no_block_cache=false
checksum=kCRC32c
whole_key_filtering=true
index_shortening=kShortenSeparators
data_block_index_type=kDataBlockBinarySearch
index_type=kBinarySearch
verify_compression=false
filter_policy=nullptr
data_block_hash_table_util_ratio=0.750000
pin_l0_filter_and_index_blocks_in_cache=false
block_restart_interval=16
cache_index_and_filter_blocks_with_high_priority=true
cache_index_and_filter_blocks=false
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
[CFOptions "gspo"]
sample_for_compression=0
compaction_pri=kMinOverlappingRatio
merge_operator=nullptr
compaction_filter_factory=nullptr
memtable_factory=SkipListFactory
memtable_insert_with_hint_prefix_extractor=nullptr
comparator=leveldb.BytewiseComparator
target_file_size_base=67108864
max_sequential_skip_in_iterations=8
compaction_style=kCompactionStyleLevel
max_bytes_for_level_base=268435456
bloom_locality=0
write_buffer_size=67108864
compression_per_level=
memtable_huge_page_size=0
max_successive_merges=0
arena_block_size=8388608
memtable_whole_key_filtering=false
target_file_size_multiplier=1
max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
num_levels=7
min_write_buffer_number_to_merge=1
max_write_buffer_number_to_maintain=0
max_write_buffer_number=2
compression=kSnappyCompression
level0_stop_writes_trigger=36
level0_slowdown_writes_trigger=20
compaction_filter=nullptr
level0_file_num_compaction_trigger=4
max_compaction_bytes=1677721600
compaction_options_universal={stop_style=kCompactionStopStyleTotalSize;compression_size_percent=-1;allow_trivial_move=false;max_merge_width=4294967295;max_size_amplification_percent=200;min_merge_width=2;size_ratio=1;}
memtable_prefix_bloom_size_ratio=0.000000
max_write_buffer_size_to_maintain=0
hard_pending_compaction_bytes_limit=274877906944
ttl=2592000
table_factory=BlockBasedTable
soft_pending_compaction_bytes_limit=68719476736
prefix_extractor=nullptr
bottommost_compression=kDisableCompressionOption
force_consistency_checks=false
paranoid_file_checks=false
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;}
max_bytes_for_level_multiplier=10.000000
optimize_filters_for_hits=false
level_compaction_dynamic_level_bytes=false
inplace_update_num_locks=10000
inplace_update_support=false
periodic_compaction_seconds=0
disable_auto_compactions=false
report_bg_io_stats=false
[TableOptions/BlockBasedTable "gspo"]
pin_top_level_index_and_filter=true
enable_index_compression=true
read_amp_bytes_per_bit=8589934592
format_version=2
block_align=false
metadata_block_size=4096
block_size_deviation=10
partition_filters=false
block_size=4096
index_block_restart_interval=1
no_block_cache=false
checksum=kCRC32c
whole_key_filtering=true
index_shortening=kShortenSeparators
data_block_index_type=kDataBlockBinarySearch
index_type=kBinarySearch
verify_compression=false
filter_policy=nullptr
data_block_hash_table_util_ratio=0.750000
pin_l0_filter_and_index_blocks_in_cache=false
block_restart_interval=16
cache_index_and_filter_blocks_with_high_priority=true
cache_index_and_filter_blocks=false
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
[CFOptions "gpos"]
sample_for_compression=0
compaction_pri=kMinOverlappingRatio
merge_operator=nullptr
compaction_filter_factory=nullptr
memtable_factory=SkipListFactory
memtable_insert_with_hint_prefix_extractor=nullptr
comparator=leveldb.BytewiseComparator
target_file_size_base=67108864
max_sequential_skip_in_iterations=8
compaction_style=kCompactionStyleLevel
max_bytes_for_level_base=268435456
bloom_locality=0
write_buffer_size=67108864
compression_per_level=
memtable_huge_page_size=0
max_successive_merges=0
arena_block_size=8388608
memtable_whole_key_filtering=false
target_file_size_multiplier=1
max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
num_levels=7
min_write_buffer_number_to_merge=1
max_write_buffer_number_to_maintain=0
max_write_buffer_number=2
compression=kSnappyCompression
level0_stop_writes_trigger=36
level0_slowdown_writes_trigger=20
compaction_filter=nullptr
level0_file_num_compaction_trigger=4
max_compaction_bytes=1677721600
compaction_options_universal={stop_style=kCompactionStopStyleTotalSize;compression_size_percent=-1;allow_trivial_move=false;max_merge_width=4294967295;max_size_amplification_percent=200;min_merge_width=2;size_ratio=1;}
memtable_prefix_bloom_size_ratio=0.000000
max_write_buffer_size_to_maintain=0
hard_pending_compaction_bytes_limit=274877906944
ttl=2592000
table_factory=BlockBasedTable
soft_pending_compaction_bytes_limit=68719476736
prefix_extractor=nullptr
bottommost_compression=kDisableCompressionOption
force_consistency_checks=false
paranoid_file_checks=false
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;}
max_bytes_for_level_multiplier=10.000000
optimize_filters_for_hits=false
level_compaction_dynamic_level_bytes=false
inplace_update_num_locks=10000
inplace_update_support=false
periodic_compaction_seconds=0
disable_auto_compactions=false
report_bg_io_stats=false
[TableOptions/BlockBasedTable "gpos"]
pin_top_level_index_and_filter=true
enable_index_compression=true
read_amp_bytes_per_bit=8589934592
format_version=2
block_align=false
metadata_block_size=4096
block_size_deviation=10
partition_filters=false
block_size=4096
index_block_restart_interval=1
no_block_cache=false
checksum=kCRC32c
whole_key_filtering=true
index_shortening=kShortenSeparators
data_block_index_type=kDataBlockBinarySearch
index_type=kBinarySearch
verify_compression=false
filter_policy=nullptr
data_block_hash_table_util_ratio=0.750000
pin_l0_filter_and_index_blocks_in_cache=false
block_restart_interval=16
cache_index_and_filter_blocks_with_high_priority=true
cache_index_and_filter_blocks=false
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
[CFOptions "gosp"]
sample_for_compression=0
compaction_pri=kMinOverlappingRatio
merge_operator=nullptr
compaction_filter_factory=nullptr
memtable_factory=SkipListFactory
memtable_insert_with_hint_prefix_extractor=nullptr
comparator=leveldb.BytewiseComparator
target_file_size_base=67108864
max_sequential_skip_in_iterations=8
compaction_style=kCompactionStyleLevel
max_bytes_for_level_base=268435456
bloom_locality=0
write_buffer_size=67108864
compression_per_level=
memtable_huge_page_size=0
max_successive_merges=0
arena_block_size=8388608
memtable_whole_key_filtering=false
target_file_size_multiplier=1
max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
num_levels=7
min_write_buffer_number_to_merge=1
max_write_buffer_number_to_maintain=0
max_write_buffer_number=2
compression=kSnappyCompression
level0_stop_writes_trigger=36
level0_slowdown_writes_trigger=20
compaction_filter=nullptr
level0_file_num_compaction_trigger=4
max_compaction_bytes=1677721600
compaction_options_universal={stop_style=kCompactionStopStyleTotalSize;compression_size_percent=-1;allow_trivial_move=false;max_merge_width=4294967295;max_size_amplification_percent=200;min_merge_width=2;size_ratio=1;}
memtable_prefix_bloom_size_ratio=0.000000
max_write_buffer_size_to_maintain=0
hard_pending_compaction_bytes_limit=274877906944
ttl=2592000
table_factory=BlockBasedTable
soft_pending_compaction_bytes_limit=68719476736
prefix_extractor=nullptr
bottommost_compression=kDisableCompressionOption
force_consistency_checks=false
paranoid_file_checks=false
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;}
max_bytes_for_level_multiplier=10.000000
optimize_filters_for_hits=false
level_compaction_dynamic_level_bytes=false
inplace_update_num_locks=10000
inplace_update_support=false
periodic_compaction_seconds=0
disable_auto_compactions=false
report_bg_io_stats=false
[TableOptions/BlockBasedTable "gosp"]
pin_top_level_index_and_filter=true
enable_index_compression=true
read_amp_bytes_per_bit=8589934592
format_version=2
block_align=false
metadata_block_size=4096
block_size_deviation=10
partition_filters=false
block_size=4096
index_block_restart_interval=1
no_block_cache=false
checksum=kCRC32c
whole_key_filtering=true
index_shortening=kShortenSeparators
data_block_index_type=kDataBlockBinarySearch
index_type=kBinarySearch
verify_compression=false
filter_policy=nullptr
data_block_hash_table_util_ratio=0.750000
pin_l0_filter_and_index_blocks_in_cache=false
block_restart_interval=16
cache_index_and_filter_blocks_with_high_priority=true
cache_index_and_filter_blocks=false
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
[CFOptions "dspo"]
sample_for_compression=0
compaction_pri=kMinOverlappingRatio
merge_operator=nullptr
compaction_filter_factory=nullptr
memtable_factory=SkipListFactory
memtable_insert_with_hint_prefix_extractor=nullptr
comparator=leveldb.BytewiseComparator
target_file_size_base=67108864
max_sequential_skip_in_iterations=8
compaction_style=kCompactionStyleLevel
max_bytes_for_level_base=268435456
bloom_locality=0
write_buffer_size=67108864
compression_per_level=
memtable_huge_page_size=0
max_successive_merges=0
arena_block_size=8388608
memtable_whole_key_filtering=false
target_file_size_multiplier=1
max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
num_levels=7
min_write_buffer_number_to_merge=1
max_write_buffer_number_to_maintain=0
max_write_buffer_number=2
compression=kSnappyCompression
level0_stop_writes_trigger=36
level0_slowdown_writes_trigger=20
compaction_filter=nullptr
level0_file_num_compaction_trigger=4
max_compaction_bytes=1677721600
compaction_options_universal={stop_style=kCompactionStopStyleTotalSize;compression_size_percent=-1;allow_trivial_move=false;max_merge_width=4294967295;max_size_amplification_percent=200;min_merge_width=2;size_ratio=1;}
memtable_prefix_bloom_size_ratio=0.000000
max_write_buffer_size_to_maintain=0
hard_pending_compaction_bytes_limit=274877906944
ttl=2592000
table_factory=BlockBasedTable
soft_pending_compaction_bytes_limit=68719476736
prefix_extractor=nullptr
bottommost_compression=kDisableCompressionOption
force_consistency_checks=false
paranoid_file_checks=false
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;}
max_bytes_for_level_multiplier=10.000000
optimize_filters_for_hits=false
level_compaction_dynamic_level_bytes=false
inplace_update_num_locks=10000
inplace_update_support=false
periodic_compaction_seconds=0
disable_auto_compactions=false
report_bg_io_stats=false
[TableOptions/BlockBasedTable "dspo"]
pin_top_level_index_and_filter=true
enable_index_compression=true
read_amp_bytes_per_bit=8589934592
format_version=2
block_align=false
metadata_block_size=4096
block_size_deviation=10
partition_filters=false
block_size=4096
index_block_restart_interval=1
no_block_cache=false
checksum=kCRC32c
whole_key_filtering=true
index_shortening=kShortenSeparators
data_block_index_type=kDataBlockBinarySearch
index_type=kBinarySearch
verify_compression=false
filter_policy=nullptr
data_block_hash_table_util_ratio=0.750000
pin_l0_filter_and_index_blocks_in_cache=false
block_restart_interval=16
cache_index_and_filter_blocks_with_high_priority=true
cache_index_and_filter_blocks=false
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
[CFOptions "dpos"]
sample_for_compression=0
compaction_pri=kMinOverlappingRatio
merge_operator=nullptr
compaction_filter_factory=nullptr
memtable_factory=SkipListFactory
memtable_insert_with_hint_prefix_extractor=nullptr
comparator=leveldb.BytewiseComparator
target_file_size_base=67108864
max_sequential_skip_in_iterations=8
compaction_style=kCompactionStyleLevel
max_bytes_for_level_base=268435456
bloom_locality=0
write_buffer_size=67108864
compression_per_level=
memtable_huge_page_size=0
max_successive_merges=0
arena_block_size=8388608
memtable_whole_key_filtering=false
target_file_size_multiplier=1
max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
num_levels=7
min_write_buffer_number_to_merge=1
max_write_buffer_number_to_maintain=0
max_write_buffer_number=2
compression=kSnappyCompression
level0_stop_writes_trigger=36
level0_slowdown_writes_trigger=20
compaction_filter=nullptr
level0_file_num_compaction_trigger=4
max_compaction_bytes=1677721600
compaction_options_universal={stop_style=kCompactionStopStyleTotalSize;compression_size_percent=-1;allow_trivial_move=false;max_merge_width=4294967295;max_size_amplification_percent=200;min_merge_width=2;size_ratio=1;}
memtable_prefix_bloom_size_ratio=0.000000
max_write_buffer_size_to_maintain=0
hard_pending_compaction_bytes_limit=274877906944
ttl=2592000
table_factory=BlockBasedTable
soft_pending_compaction_bytes_limit=68719476736
prefix_extractor=nullptr
bottommost_compression=kDisableCompressionOption
force_consistency_checks=false
paranoid_file_checks=false
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;}
max_bytes_for_level_multiplier=10.000000
optimize_filters_for_hits=false
level_compaction_dynamic_level_bytes=false
inplace_update_num_locks=10000
inplace_update_support=false
periodic_compaction_seconds=0
disable_auto_compactions=false
report_bg_io_stats=false
[TableOptions/BlockBasedTable "dpos"]
pin_top_level_index_and_filter=true
enable_index_compression=true
read_amp_bytes_per_bit=8589934592
format_version=2
block_align=false
metadata_block_size=4096
block_size_deviation=10
partition_filters=false
block_size=4096
index_block_restart_interval=1
no_block_cache=false
checksum=kCRC32c
whole_key_filtering=true
index_shortening=kShortenSeparators
data_block_index_type=kDataBlockBinarySearch
index_type=kBinarySearch
verify_compression=false
filter_policy=nullptr
data_block_hash_table_util_ratio=0.750000
pin_l0_filter_and_index_blocks_in_cache=false
block_restart_interval=16
cache_index_and_filter_blocks_with_high_priority=true
cache_index_and_filter_blocks=false
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory
[CFOptions "dosp"]
sample_for_compression=0
compaction_pri=kMinOverlappingRatio
merge_operator=nullptr
compaction_filter_factory=nullptr
memtable_factory=SkipListFactory
memtable_insert_with_hint_prefix_extractor=nullptr
comparator=leveldb.BytewiseComparator
target_file_size_base=67108864
max_sequential_skip_in_iterations=8
compaction_style=kCompactionStyleLevel
max_bytes_for_level_base=268435456
bloom_locality=0
write_buffer_size=67108864
compression_per_level=
memtable_huge_page_size=0
max_successive_merges=0
arena_block_size=8388608
memtable_whole_key_filtering=false
target_file_size_multiplier=1
max_bytes_for_level_multiplier_additional=1:1:1:1:1:1:1
num_levels=7
min_write_buffer_number_to_merge=1
max_write_buffer_number_to_maintain=0
max_write_buffer_number=2
compression=kSnappyCompression
level0_stop_writes_trigger=36
level0_slowdown_writes_trigger=20
compaction_filter=nullptr
level0_file_num_compaction_trigger=4
max_compaction_bytes=1677721600
compaction_options_universal={stop_style=kCompactionStopStyleTotalSize;compression_size_percent=-1;allow_trivial_move=false;max_merge_width=4294967295;max_size_amplification_percent=200;min_merge_width=2;size_ratio=1;}
memtable_prefix_bloom_size_ratio=0.000000
max_write_buffer_size_to_maintain=0
hard_pending_compaction_bytes_limit=274877906944
ttl=2592000
table_factory=BlockBasedTable
soft_pending_compaction_bytes_limit=68719476736
prefix_extractor=nullptr
bottommost_compression=kDisableCompressionOption
force_consistency_checks=false
paranoid_file_checks=false
compaction_options_fifo={allow_compaction=false;max_table_files_size=1073741824;}
max_bytes_for_level_multiplier=10.000000
optimize_filters_for_hits=false
level_compaction_dynamic_level_bytes=false
inplace_update_num_locks=10000
inplace_update_support=false
periodic_compaction_seconds=0
disable_auto_compactions=false
report_bg_io_stats=false
[TableOptions/BlockBasedTable "dosp"]
pin_top_level_index_and_filter=true
enable_index_compression=true
read_amp_bytes_per_bit=8589934592
format_version=2
block_align=false
metadata_block_size=4096
block_size_deviation=10
partition_filters=false
block_size=4096
index_block_restart_interval=1
no_block_cache=false
checksum=kCRC32c
whole_key_filtering=true
index_shortening=kShortenSeparators
data_block_index_type=kDataBlockBinarySearch
index_type=kBinarySearch
verify_compression=false
filter_policy=nullptr
data_block_hash_table_util_ratio=0.750000
pin_l0_filter_and_index_blocks_in_cache=false
block_restart_interval=16
cache_index_and_filter_blocks_with_high_priority=true
cache_index_and_filter_blocks=false
hash_index_allow_collision=true
flush_block_policy_factory=FlushBlockBySizePolicyFactory

@ -1,4 +0,0 @@
segment_size: 524288
use_compression: false
version: 0.34
v

Binary file not shown.

@ -1,8 +1,6 @@
use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::model::vocab::{rdf, xsd};
use oxigraph::model::*;
#[cfg(not(target_arch = "wasm32"))]
use oxigraph::store::ConflictableTransactionError;
use oxigraph::store::Store;
use std::io;
use std::io::Cursor;
@ -137,44 +135,11 @@ fn test_dump_dataset() -> io::Result<()> {
Ok(())
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn test_transaction_load_graph() -> io::Result<()> {
let store = Store::new()?;
store.transaction(|t| {
t.load_graph(
Cursor::new(DATA),
GraphFormat::Turtle,
GraphNameRef::DefaultGraph,
None,
)?;
Ok(()) as Result<_, ConflictableTransactionError<io::Error>>
})?;
for q in quads(GraphNameRef::DefaultGraph) {
assert!(store.contains(q)?);
}
Ok(())
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn test_transaction_load_dataset() -> io::Result<()> {
let store = Store::new()?;
store.transaction(|t| {
t.load_dataset(Cursor::new(DATA), DatasetFormat::TriG, None)?;
Ok(()) as Result<_, ConflictableTransactionError<io::Error>>
})?;
for q in quads(GraphNameRef::DefaultGraph) {
assert!(store.contains(q)?);
}
Ok(())
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn test_backward_compatibility() -> io::Result<()> {
{
let store = Store::open("tests/sled_bc_data")?;
let store = Store::open("tests/rocksdb_bc_data")?;
for q in quads(GraphNameRef::DefaultGraph) {
assert!(store.contains(q)?);
}
@ -189,7 +154,7 @@ fn test_backward_compatibility() -> io::Result<()> {
store.named_graphs().collect::<io::Result<Vec<_>>>()?
);
};
reset_dir("tests/sled_bc_data")?;
reset_dir("tests/rocksdb_bc_data")?;
Ok(())
}

@ -14,7 +14,7 @@ use std::io::BufReader;
/// Disk-based RDF store.
///
/// It encodes a `RDF dataset <https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset>`_ and allows to query it using SPARQL.
/// It is based on the `Sled <https://sled.rs/>`_ key-value database
/// It is based on the `RocksDB <https://rocksdb.org/>`_ key-value database
///
/// :param path: the path of the directory in which the store should read and write its data. If the directory does not exist, it is created. If no directory is provided a temporary one is created and removed when the Python garbage collector removes the store.
/// :type path: str or None, optional

@ -11,7 +11,7 @@ Oxigraph Server
Oxigraph Server is a standalone HTTP server providing a graph database implementing the [SPARQL](https://www.w3.org/TR/sparql11-overview/) standard.
Its goal is to provide a compliant, safe, and fast graph database based on the [Sled](https://sled.rs/) key-value stores.
Its goal is to provide a compliant, safe, and fast graph database based on the [RocksDB](https://rocksdb.org/) key-value store.
It is written in Rust.
It also provides a set of utility functions for reading, writing, and processing RDF files.

Loading…
Cancel
Save