Adds wasm32 support to lib crate

The storage layer is in memory and not optimized yet
pull/171/head
Tpt 3 years ago
parent f084cfe332
commit e9608fb2eb
  1. 9
      .github/workflows/build.yml
  2. 10
      lib/Cargo.toml
  3. 12
      lib/src/model/xsd/date_time.rs
  4. 175
      lib/src/storage/fallback_backend.rs
  5. 230
      lib/src/storage/mod.rs
  6. 113
      lib/src/storage/sled_backend.rs
  7. 20
      lib/src/store.rs
  8. 4
      lib/tests/store.rs

@ -42,6 +42,15 @@ jobs:
env:
RUST_BACKTRACE: 1
test_wasm:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- run: rustup update
- run: cargo install wasm-pack
- run: wasm-pack test --node
working-directory: ./lib
python:
runs-on: ubuntu-latest
steps:

@ -22,7 +22,6 @@ sophia = ["sophia_api"]
http_client = ["httparse", "native-tls"]
[dependencies]
sled = "0.34"
quick-xml = "0.22"
rand = "0.8"
md-5 = "0.9"
@ -46,12 +45,21 @@ native-tls = { version = "0.2", optional = true }
json-event-parser = "0.1"
spargebra = { version = "0.1", path="../spargebra", features = ["rdf-star"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
sled = "0.34"
[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3"
getrandom = {version="0.2", features=["js"]}
[dev-dependencies]
rayon = "1"
criterion = "0.3"
sophia_api = { version = "0.7", features = ["test_macro"] }
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = "0.3"
[[bench]]
name = "store"
harness = false

@ -10,7 +10,6 @@ use std::error::Error;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::str::FromStr;
use std::time::SystemTime;
use std::time::SystemTimeError;
/// [XML Schema `dateTime` datatype](https://www.w3.org/TR/xmlschema11-2/#dateTime) implementation.
@ -1342,7 +1341,18 @@ impl Timestamp {
}
}
#[cfg(target_arch = "wasm32")]
fn since_unix_epoch() -> Result<Duration, DateTimeError> {
Ok(Duration::new(
0,
Decimal::from_f64(js_sys::Date::now() / 1000.),
))
}
#[cfg(not(target_arch = "wasm32"))]
fn since_unix_epoch() -> Result<Duration, DateTimeError> {
use std::time::SystemTime;
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)?
.try_into()

@ -0,0 +1,175 @@
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::io::Result;
use std::path::Path;
use std::sync::{Arc, Mutex, RwLock};
#[derive(Clone)]
pub struct Db {
trees: Arc<Mutex<BTreeMap<&'static str, Tree>>>,
default: Tree,
}
impl Db {
pub fn new() -> Result<Self> {
Ok(Self {
trees: Arc::default(),
default: Tree::new(),
})
}
pub fn open_tree(&self, name: &'static str) -> Result<Tree> {
Ok(self
.trees
.lock()
.unwrap()
.entry(name)
.or_insert_with(Tree::new)
.clone())
}
pub fn flush(&self) -> Result<()> {
Ok(())
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.default.get(key)
}
pub fn insert(&self, key: &[u8], value: impl Into<Vec<u8>>) -> Result<bool> {
self.default.insert(key.into(), value)
}
}
#[derive(Clone)]
pub struct Tree {
tree: Arc<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
merge_operator: Arc<dyn Fn(&[u8], Option<&[u8]>, &[u8]) -> Option<Vec<u8>> + 'static>,
}
impl Tree {
fn new() -> Self {
Self {
tree: Arc::default(),
merge_operator: Arc::new(|_, _, v| Some(v.into())),
}
}
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(
self.tree.read().unwrap().get(key).map(|v| v.clone()), //TODO: avoid clone
)
}
pub fn contains_key(&self, key: &[u8]) -> Result<bool> {
Ok(self.tree.read().unwrap().contains_key(key.as_ref()))
}
pub fn insert(&self, key: &[u8], value: impl Into<Vec<u8>>) -> Result<bool> {
Ok(self
.tree
.write()
.unwrap()
.insert(key.into(), value.into())
.is_none())
}
pub fn insert_empty(&self, key: &[u8]) -> Result<bool> {
self.insert(key, [])
}
pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<()> {
match self.tree.write().unwrap().entry(key.into()) {
Entry::Occupied(e) => match (self.merge_operator)(key.as_ref(), Some(e.get()), value) {
Some(v) => {
*e.into_mut() = v;
}
None => {
e.remove();
}
},
Entry::Vacant(e) => {
if let Some(v) = (self.merge_operator)(key.as_ref(), None, value) {
e.insert(v);
}
}
}
Ok(())
}
pub fn remove(&self, key: &[u8]) -> Result<bool> {
Ok(self.tree.write().unwrap().remove(key.as_ref()).is_some())
}
pub fn update_and_fetch<V: Into<Vec<u8>>>(
&self,
key: &[u8],
mut f: impl FnMut(Option<&[u8]>) -> Option<V>,
) -> Result<Option<Vec<u8>>> {
Ok(match self.tree.write().unwrap().entry(key.into()) {
Entry::Occupied(e) => match f(Some(e.get())) {
Some(v) => {
let v = v.into();
let e_mut = e.into_mut();
e_mut.clear();
e_mut.extend_from_slice(&v);
Some(v)
}
None => {
e.remove();
None
}
},
Entry::Vacant(e) => match f(None) {
Some(v) => {
let v = v.into();
e.insert(v.clone());
Some(v)
}
None => None,
},
})
}
pub fn clear(&self) -> Result<()> {
Ok(self.tree.write().unwrap().clear())
}
pub fn iter(&self) -> Iter {
self.scan_prefix(&[])
}
pub fn scan_prefix(&self, prefix: &[u8]) -> Iter {
let tree = self.tree.read().unwrap();
let data: Vec<_> = if prefix.is_empty() {
tree.iter()
.map(|(k, v)| Ok((k.clone(), v.clone())))
.collect()
} else {
tree.range(prefix.to_vec()..)
.take_while(|(k, _)| k.starts_with(prefix))
.map(|(k, v)| Ok((k.clone(), v.clone())))
.collect()
};
data.into_iter()
}
pub fn len(&self) -> usize {
self.tree.read().unwrap().len()
}
pub fn is_empty(&self) -> bool {
self.tree.read().unwrap().is_empty()
}
pub fn flush(&self) -> Result<()> {
Ok(())
}
pub fn set_merge_operator(
&mut self,
merge_operator: impl Fn(&[u8], Option<&[u8]>, &[u8]) -> Option<Vec<u8>> + 'static,
) {
self.merge_operator = Arc::new(merge_operator)
}
}
pub type Iter = std::vec::IntoIter<Result<(Vec<u8>, Vec<u8>)>>;

@ -2,12 +2,12 @@ use std::error::Error;
use std::fmt;
use std::path::Path;
#[cfg(not(target_arch = "wasm32"))]
use sled::transaction::{
ConflictableTransactionError as Sled2ConflictableTransactionError,
TransactionError as Sled2TransactionError, TransactionalTree,
UnabortableTransactionError as Sled2UnabortableTransactionError,
};
use sled::{Config, Db, Iter, Transactional, Tree};
use crate::error::invalid_data_error;
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef};
@ -20,11 +20,19 @@ use crate::storage::binary_encoder::{
};
use crate::storage::io::StoreOrParseError;
use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup, TermEncoder};
#[cfg(target_arch = "wasm32")]
use fallback_backend::{Db, Iter, Tree};
#[cfg(not(target_arch = "wasm32"))]
use sled_backend::{Db, Iter, Tree};
use std::convert::TryInto;
mod binary_encoder;
#[cfg(target_arch = "wasm32")]
mod fallback_backend;
pub mod io;
pub mod numeric_encoder;
#[cfg(not(target_arch = "wasm32"))]
mod sled_backend;
pub mod small_string;
/// Low level storage primitives
@ -46,18 +54,20 @@ pub struct Storage {
impl Storage {
pub fn new() -> std::io::Result<Self> {
Self::do_open(&Config::new().temporary(true))
Self::setup(Db::new()?)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn open(path: &Path) -> std::io::Result<Self> {
Self::do_open(&Config::new().path(path))
Self::setup(Db::open(path)?)
}
fn do_open(config: &Config) -> std::io::Result<Self> {
let db = config.open()?;
fn setup(db: Db) -> std::io::Result<Self> {
let mut id2str = db.open_tree("id2str")?;
id2str.set_merge_operator(id2str_merge);
let this = Self {
default: db.clone(),
id2str: db.open_tree("id2str")?,
id2str,
spog: db.open_tree("spog")?,
posg: db.open_tree("posg")?,
ospg: db.open_tree("ospg")?,
@ -68,8 +78,8 @@ impl Storage {
dpos: db.open_tree("dpos")?,
dosp: db.open_tree("dosp")?,
graphs: db.open_tree("graphs")?,
default: db,
};
this.id2str.set_merge_operator(id2str_merge);
let mut version = this.ensure_version()?;
if version == 0 {
@ -82,7 +92,7 @@ impl Storage {
}
version = 1;
this.set_version(version)?;
this.graphs.flush()?;
this.default.flush()?;
}
if version == 1 {
// We migrate to v2
@ -91,7 +101,7 @@ impl Storage {
let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&u32::MAX.to_be_bytes());
new_value.extend_from_slice(&value);
this.id2str.insert(key, new_value)?;
this.id2str.insert(&key, new_value)?;
}
version = 2;
this.set_version(version)?;
@ -112,7 +122,7 @@ impl Storage {
}
fn ensure_version(&self) -> std::io::Result<u64> {
Ok(if let Some(version) = self.default.get("oxversion")? {
Ok(if let Some(version) = self.default.get(b"oxversion")? {
let mut buffer = [0; 8];
buffer.copy_from_slice(&version);
u64::from_be_bytes(buffer)
@ -123,26 +133,30 @@ impl Storage {
}
fn set_version(&self, version: u64) -> std::io::Result<()> {
self.default.insert("oxversion", &version.to_be_bytes())?;
self.default
.insert(b"oxversion", version.to_be_bytes().to_vec())?;
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn transaction<T, E>(
&self,
f: impl Fn(StorageTransaction<'_>) -> Result<T, ConflictableTransactionError<E>>,
) -> Result<T, TransactionError<E>> {
use sled::Transactional;
Ok((
&self.id2str,
&self.spog,
&self.posg,
&self.ospg,
&self.gspo,
&self.gpos,
&self.gosp,
&self.dspo,
&self.dpos,
&self.dosp,
&self.graphs,
self.id2str.as_sled(),
self.spog.as_sled(),
self.posg.as_sled(),
self.ospg.as_sled(),
self.gspo.as_sled(),
self.gpos.as_sled(),
self.gosp.as_sled(),
self.dspo.as_sled(),
self.dpos.as_sled(),
self.dosp.as_sled(),
self.graphs.as_sled(),
)
.transaction(
move |(id2str, spog, posg, ospg, gspo, gpos, gosp, dspo, dpos, dosp, graphs)| {
@ -175,10 +189,10 @@ impl Storage {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad);
Ok(self.dspo.contains_key(buffer)?)
Ok(self.dspo.contains_key(&buffer)?)
} else {
write_gspo_quad(&mut buffer, quad);
Ok(self.gspo.contains_key(buffer)?)
Ok(self.gspo.contains_key(&buffer)?)
}
}
@ -246,20 +260,17 @@ impl Storage {
}
pub fn quads(&self) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::pair(
self.dspo_quads(Vec::default()),
self.gspo_quads(Vec::default()),
)
ChainedDecodingQuadIterator::pair(self.dspo_quads(&[]), self.gspo_quads(&[]))
}
fn quads_in_named_graph(&self) -> DecodingQuadIterator {
self.gspo_quads(Vec::default())
self.gspo_quads(&[])
}
fn quads_for_subject(&self, subject: &EncodedTerm) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::pair(
self.dspo_quads(encode_term(subject)),
self.spog_quads(encode_term(subject)),
self.dspo_quads(&encode_term(subject)),
self.spog_quads(&encode_term(subject)),
)
}
@ -269,8 +280,8 @@ impl Storage {
predicate: &EncodedTerm,
) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::pair(
self.dspo_quads(encode_term_pair(subject, predicate)),
self.spog_quads(encode_term_pair(subject, predicate)),
self.dspo_quads(&encode_term_pair(subject, predicate)),
self.spog_quads(&encode_term_pair(subject, predicate)),
)
}
@ -281,8 +292,8 @@ impl Storage {
object: &EncodedTerm,
) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::pair(
self.dspo_quads(encode_term_triple(subject, predicate, object)),
self.spog_quads(encode_term_triple(subject, predicate, object)),
self.dspo_quads(&encode_term_triple(subject, predicate, object)),
self.spog_quads(&encode_term_triple(subject, predicate, object)),
)
}
@ -292,15 +303,15 @@ impl Storage {
object: &EncodedTerm,
) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::pair(
self.dosp_quads(encode_term_pair(object, subject)),
self.ospg_quads(encode_term_pair(object, subject)),
self.dosp_quads(&encode_term_pair(object, subject)),
self.ospg_quads(&encode_term_pair(object, subject)),
)
}
fn quads_for_predicate(&self, predicate: &EncodedTerm) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::pair(
self.dpos_quads(encode_term(predicate)),
self.posg_quads(encode_term(predicate)),
self.dpos_quads(&encode_term(predicate)),
self.posg_quads(&encode_term(predicate)),
)
}
@ -310,23 +321,23 @@ impl Storage {
object: &EncodedTerm,
) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::pair(
self.dpos_quads(encode_term_pair(predicate, object)),
self.posg_quads(encode_term_pair(predicate, object)),
self.dpos_quads(&encode_term_pair(predicate, object)),
self.posg_quads(&encode_term_pair(predicate, object)),
)
}
fn quads_for_object(&self, object: &EncodedTerm) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::pair(
self.dosp_quads(encode_term(object)),
self.ospg_quads(encode_term(object)),
self.dosp_quads(&encode_term(object)),
self.ospg_quads(&encode_term(object)),
)
}
fn quads_for_graph(&self, graph_name: &EncodedTerm) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
self.dspo_quads(Vec::default())
self.dspo_quads(&Vec::default())
} else {
self.gspo_quads(encode_term(graph_name))
self.gspo_quads(&encode_term(graph_name))
})
}
@ -336,9 +347,9 @@ impl Storage {
graph_name: &EncodedTerm,
) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
self.dspo_quads(encode_term(subject))
self.dspo_quads(&encode_term(subject))
} else {
self.gspo_quads(encode_term_pair(graph_name, subject))
self.gspo_quads(&encode_term_pair(graph_name, subject))
})
}
@ -349,9 +360,9 @@ impl Storage {
graph_name: &EncodedTerm,
) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
self.dspo_quads(encode_term_pair(subject, predicate))
self.dspo_quads(&encode_term_pair(subject, predicate))
} else {
self.gspo_quads(encode_term_triple(graph_name, subject, predicate))
self.gspo_quads(&encode_term_triple(graph_name, subject, predicate))
})
}
@ -363,9 +374,9 @@ impl Storage {
graph_name: &EncodedTerm,
) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
self.dspo_quads(encode_term_triple(subject, predicate, object))
self.dspo_quads(&encode_term_triple(subject, predicate, object))
} else {
self.gspo_quads(encode_term_quad(graph_name, subject, predicate, object))
self.gspo_quads(&encode_term_quad(graph_name, subject, predicate, object))
})
}
@ -376,9 +387,9 @@ impl Storage {
graph_name: &EncodedTerm,
) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
self.dosp_quads(encode_term_pair(object, subject))
self.dosp_quads(&encode_term_pair(object, subject))
} else {
self.gosp_quads(encode_term_triple(graph_name, object, subject))
self.gosp_quads(&encode_term_triple(graph_name, object, subject))
})
}
@ -388,9 +399,9 @@ impl Storage {
graph_name: &EncodedTerm,
) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
self.dpos_quads(encode_term(predicate))
self.dpos_quads(&encode_term(predicate))
} else {
self.gpos_quads(encode_term_pair(graph_name, predicate))
self.gpos_quads(&encode_term_pair(graph_name, predicate))
})
}
@ -401,9 +412,9 @@ impl Storage {
graph_name: &EncodedTerm,
) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
self.dpos_quads(encode_term_pair(predicate, object))
self.dpos_quads(&encode_term_pair(predicate, object))
} else {
self.gpos_quads(encode_term_triple(graph_name, predicate, object))
self.gpos_quads(&encode_term_triple(graph_name, predicate, object))
})
}
@ -413,9 +424,9 @@ impl Storage {
graph_name: &EncodedTerm,
) -> ChainedDecodingQuadIterator {
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
self.dosp_quads(encode_term(object))
self.dosp_quads(&encode_term(object))
} else {
self.gosp_quads(encode_term_pair(graph_name, object))
self.gosp_quads(&encode_term_pair(graph_name, object))
})
}
@ -426,50 +437,46 @@ impl Storage {
}
pub fn contains_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result<bool> {
Ok(self.graphs.contains_key(&encode_term(graph_name))?)
self.graphs.contains_key(&encode_term(graph_name))
}
fn spog_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
fn spog_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.spog, prefix, QuadEncoding::Spog)
}
fn posg_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
fn posg_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.posg, prefix, QuadEncoding::Posg)
}
fn ospg_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
fn ospg_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.ospg, prefix, QuadEncoding::Ospg)
}
fn gspo_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
fn gspo_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.gspo, prefix, QuadEncoding::Gspo)
}
fn gpos_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
fn gpos_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.gpos, prefix, QuadEncoding::Gpos)
}
fn gosp_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
fn gosp_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.gosp, prefix, QuadEncoding::Gosp)
}
fn dspo_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
fn dspo_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.dspo, prefix, QuadEncoding::Dspo)
}
fn dpos_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
fn dpos_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.dpos, prefix, QuadEncoding::Dpos)
}
fn dosp_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
fn dosp_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.dosp, prefix, QuadEncoding::Dosp)
}
fn inner_quads(
tree: &Tree,
prefix: impl AsRef<[u8]>,
encoding: QuadEncoding,
) -> DecodingQuadIterator {
fn inner_quads(tree: &Tree, prefix: &[u8], encoding: QuadEncoding) -> DecodingQuadIterator {
DecodingQuadIterator {
iter: tree.scan_prefix(prefix),
encoding,
@ -482,51 +489,51 @@ impl Storage {
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, &encoded);
let is_new = self.dspo.insert(buffer.as_slice(), &[])?.is_none();
let is_new = self.dspo.insert_empty(buffer.as_slice())?;
if is_new {
buffer.clear();
self.insert_quad_triple(quad, &encoded)?;
write_pos_quad(&mut buffer, &encoded);
self.dpos.insert(buffer.as_slice(), &[])?;
self.dpos.insert_empty(buffer.as_slice())?;
buffer.clear();
write_osp_quad(&mut buffer, &encoded);
self.dosp.insert(buffer.as_slice(), &[])?;
self.dosp.insert_empty(buffer.as_slice())?;
buffer.clear();
}
Ok(is_new)
} else {
write_spog_quad(&mut buffer, &encoded);
let is_new = self.spog.insert(buffer.as_slice(), &[])?.is_none();
let is_new = self.spog.insert_empty(buffer.as_slice())?;
if is_new {
buffer.clear();
self.insert_quad_triple(quad, &encoded)?;
write_posg_quad(&mut buffer, &encoded);
self.posg.insert(buffer.as_slice(), &[])?;
self.posg.insert_empty(buffer.as_slice())?;
buffer.clear();
write_ospg_quad(&mut buffer, &encoded);
self.ospg.insert(buffer.as_slice(), &[])?;
self.ospg.insert_empty(buffer.as_slice())?;
buffer.clear();
write_gspo_quad(&mut buffer, &encoded);
self.gspo.insert(buffer.as_slice(), &[])?;
self.gspo.insert_empty(buffer.as_slice())?;
buffer.clear();
write_gpos_quad(&mut buffer, &encoded);
self.gpos.insert(buffer.as_slice(), &[])?;
self.gpos.insert_empty(buffer.as_slice())?;
buffer.clear();
write_gosp_quad(&mut buffer, &encoded);
self.gosp.insert(buffer.as_slice(), &[])?;
self.gosp.insert_empty(buffer.as_slice())?;
buffer.clear();
write_term(&mut buffer, &encoded.graph_name);
if self.graphs.insert(&buffer, &[])?.is_none() {
if self.graphs.insert_empty(&buffer)? {
self.insert_graph_name(quad.graph_name, &encoded.graph_name)?;
}
buffer.clear();
@ -545,7 +552,7 @@ impl Storage {
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad);
let is_present = self.dspo.remove(buffer.as_slice())?.is_some();
let is_present = self.dspo.remove(buffer.as_slice())?;
if is_present {
buffer.clear();
@ -564,7 +571,7 @@ impl Storage {
Ok(is_present)
} else {
write_spog_quad(&mut buffer, quad);
let is_present = self.spog.remove(buffer.as_slice())?.is_some();
let is_present = self.spog.remove(buffer.as_slice())?;
if is_present {
buffer.clear();
@ -607,7 +614,7 @@ impl Storage {
}
fn insert_encoded_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result<bool> {
Ok(self.graphs.insert(&encode_term(graph_name), &[])?.is_none())
self.graphs.insert_empty(&encode_term(graph_name))
}
pub fn clear_graph(&self, graph_name: GraphNameRef<'_>) -> std::io::Result<()> {
@ -636,14 +643,12 @@ impl Storage {
for quad in self.quads_for_graph(&graph_name) {
self.remove_encoded(&quad?)?;
}
Ok(
if self.graphs.remove(&encode_term(&graph_name))?.is_some() {
self.remove_term(&graph_name)?;
true
} else {
false
},
)
Ok(if self.graphs.remove(&encode_term(&graph_name))? {
self.remove_term(&graph_name)?;
true
} else {
false
})
}
pub fn remove_all_named_graphs(&self) -> std::io::Result<()> {
@ -672,11 +677,13 @@ impl Storage {
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn flush(&self) -> std::io::Result<()> {
self.default.flush()?;
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn flush_async(&self) -> std::io::Result<()> {
self.default.flush_async().await?;
Ok(())
@ -684,14 +691,14 @@ impl Storage {
pub fn get_str(&self, key: &StrHash) -> std::io::Result<Option<String>> {
self.id2str
.get(key.to_be_bytes())?
.get(&key.to_be_bytes())?
.map(|v| String::from_utf8(v[4..].to_vec()))
.transpose()
.map_err(invalid_data_error)
}
pub fn contains_str(&self, key: &StrHash) -> std::io::Result<bool> {
Ok(self.id2str.contains_key(key.to_be_bytes())?)
self.id2str.contains_key(&key.to_be_bytes())
}
}
@ -761,6 +768,7 @@ impl Iterator for DecodingGraphIterator {
}
}
#[cfg(not(target_arch = "wasm32"))]
pub struct StorageTransaction<'a> {
id2str: &'a TransactionalTree,
spog: &'a TransactionalTree,
@ -775,6 +783,7 @@ pub struct StorageTransaction<'a> {
graphs: &'a TransactionalTree,
}
#[cfg(not(target_arch = "wasm32"))]
impl<'a> StorageTransaction<'a> {
pub fn insert(&self, quad: QuadRef<'_>) -> Result<bool, UnabortableTransactionError> {
let encoded = quad.into();
@ -923,6 +932,7 @@ impl<'a> StorageTransaction<'a> {
}
/// Error returned by a Sled transaction
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug)]
pub enum TransactionError<T> {
/// A failure returned by the API user that have aborted the transaction
@ -931,6 +941,7 @@ pub enum TransactionError<T> {
Storage(std::io::Error),
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: fmt::Display> fmt::Display for TransactionError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
@ -940,6 +951,7 @@ impl<T: fmt::Display> fmt::Display for TransactionError<T> {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Error + 'static> Error for TransactionError<T> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
@ -949,6 +961,7 @@ impl<T: Error + 'static> Error for TransactionError<T> {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T> From<Sled2TransactionError<T>> for TransactionError<T> {
fn from(e: Sled2TransactionError<T>) -> Self {
match e {
@ -958,6 +971,7 @@ impl<T> From<Sled2TransactionError<T>> for TransactionError<T> {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Into<Self>> From<TransactionError<T>> for std::io::Error {
fn from(e: TransactionError<T>) -> Self {
match e {
@ -969,6 +983,7 @@ impl<T: Into<Self>> From<TransactionError<T>> for std::io::Error {
/// An error returned from the transaction methods.
/// Should be returned as it is
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug)]
pub enum UnabortableTransactionError {
#[doc(hidden)]
@ -977,6 +992,7 @@ pub enum UnabortableTransactionError {
Storage(std::io::Error),
}
#[cfg(not(target_arch = "wasm32"))]
impl fmt::Display for UnabortableTransactionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
@ -986,6 +1002,7 @@ impl fmt::Display for UnabortableTransactionError {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl Error for UnabortableTransactionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
@ -995,6 +1012,7 @@ impl Error for UnabortableTransactionError {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl From<UnabortableTransactionError> for EvaluationError {
fn from(e: UnabortableTransactionError) -> Self {
match e {
@ -1004,6 +1022,7 @@ impl From<UnabortableTransactionError> for EvaluationError {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl From<StoreOrParseError<Self>> for UnabortableTransactionError {
fn from(e: StoreOrParseError<Self>) -> Self {
match e {
@ -1013,6 +1032,7 @@ impl From<StoreOrParseError<Self>> for UnabortableTransactionError {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl From<Sled2UnabortableTransactionError> for UnabortableTransactionError {
fn from(e: Sled2UnabortableTransactionError) -> Self {
match e {
@ -1023,6 +1043,7 @@ impl From<Sled2UnabortableTransactionError> for UnabortableTransactionError {
}
/// An error returned from the transaction closure
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug)]
pub enum ConflictableTransactionError<T> {
/// A failure returned by the user that will abort the transaction
@ -1033,6 +1054,7 @@ pub enum ConflictableTransactionError<T> {
Storage(std::io::Error),
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: fmt::Display> fmt::Display for ConflictableTransactionError<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
@ -1043,6 +1065,7 @@ impl<T: fmt::Display> fmt::Display for ConflictableTransactionError<T> {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T: Error + 'static> Error for ConflictableTransactionError<T> {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
@ -1053,6 +1076,7 @@ impl<T: Error + 'static> Error for ConflictableTransactionError<T> {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T> From<UnabortableTransactionError> for ConflictableTransactionError<T> {
fn from(e: UnabortableTransactionError) -> Self {
match e {
@ -1062,6 +1086,7 @@ impl<T> From<UnabortableTransactionError> for ConflictableTransactionError<T> {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<T> From<ConflictableTransactionError<T>> for Sled2ConflictableTransactionError<T> {
fn from(e: ConflictableTransactionError<T>) -> Self {
match e {
@ -1084,6 +1109,7 @@ impl StrLookup for Storage {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<'a> StrLookup for StorageTransaction<'a> {
type Error = UnabortableTransactionError;
@ -1100,12 +1126,12 @@ impl TermEncoder for Storage {
type Error = std::io::Error;
fn insert_str(&self, key: &StrHash, value: &str) -> std::io::Result<()> {
self.id2str.merge(key.to_be_bytes(), value)?;
self.id2str.merge(&key.to_be_bytes(), value.as_bytes())?;
Ok(())
}
fn remove_str(&self, key: &StrHash) -> std::io::Result<()> {
self.id2str.update_and_fetch(key.to_be_bytes(), |old| {
self.id2str.update_and_fetch(&key.to_be_bytes(), |old| {
let old = old?;
match u32::from_be_bytes(old[..4].try_into().ok()?) {
0 | 1 => None,
@ -1121,6 +1147,7 @@ impl TermEncoder for Storage {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<'a> TermEncoder for StorageTransaction<'a> {
type Error = UnabortableTransactionError;
@ -1176,6 +1203,7 @@ impl StorageLike for Storage {
}
}
#[cfg(not(target_arch = "wasm32"))]
impl<'a> StorageLike for StorageTransaction<'a> {
fn insert(&self, quad: QuadRef<'_>) -> Result<bool, UnabortableTransactionError> {
self.insert(quad)
@ -1258,6 +1286,7 @@ mod tests {
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn test_strings_removal_in_transaction() -> std::io::Result<()> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
@ -1301,6 +1330,7 @@ mod tests {
Ok(())
}
#[cfg(not(target_arch = "wasm32"))]
fn transac<T>(
storage: &Storage,
f: impl Fn(StorageTransaction<'_>) -> Result<T, UnabortableTransactionError>,

@ -0,0 +1,113 @@
use std::io::Result;
use std::path::Path;
#[derive(Clone)]
pub struct Db(sled::Db);
impl Db {
pub fn new() -> Result<Self> {
Ok(Self(sled::Config::new().temporary(true).open()?))
}
pub fn open(path: &Path) -> Result<Self> {
Ok(Self(sled::Config::new().path(path).open()?))
}
pub fn open_tree(&self, name: &'static str) -> Result<Tree> {
Ok(Tree(self.0.open_tree(name)?))
}
pub fn flush(&self) -> Result<()> {
self.0.flush()?;
Ok(())
}
pub async fn flush_async(&self) -> Result<()> {
self.0.flush_async().await?;
Ok(())
}
pub fn get(&self, key: &[u8]) -> Result<Option<sled::IVec>> {
Ok(self.0.get(key)?)
}
pub fn insert(&self, key: &[u8], value: impl Into<sled::IVec>) -> Result<bool> {
Ok(self.0.insert(key, value)?.is_none())
}
}
#[derive(Clone)]
pub struct Tree(sled::Tree);
impl Tree {
pub fn get(&self, key: &[u8]) -> Result<Option<sled::IVec>> {
Ok(self.0.get(key)?)
}
pub fn contains_key(&self, key: &[u8]) -> Result<bool> {
Ok(self.0.contains_key(key)?)
}
pub fn insert(&self, key: &[u8], value: impl Into<sled::IVec>) -> Result<bool> {
Ok(self.0.insert(key, value)?.is_none())
}
pub fn insert_empty(&self, key: &[u8]) -> Result<bool> {
self.insert(key, &[])
}
pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.0.merge(key, value)?;
Ok(())
}
pub fn remove(&self, key: &[u8]) -> Result<bool> {
Ok(self.0.remove(key)?.is_some())
}
pub fn update_and_fetch<V: Into<sled::IVec>>(
&self,
key: &[u8],
f: impl FnMut(Option<&[u8]>) -> Option<V>,
) -> Result<Option<sled::IVec>> {
Ok(self.0.update_and_fetch(key, f)?)
}
pub fn clear(&self) -> Result<()> {
Ok(self.0.clear()?)
}
pub fn iter(&self) -> sled::Iter {
self.0.iter()
}
pub fn scan_prefix(&self, prefix: &[u8]) -> sled::Iter {
self.0.scan_prefix(prefix)
}
pub fn len(&self) -> usize {
self.0.len()
}
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn flush(&self) -> Result<()> {
self.0.flush()?;
Ok(())
}
pub fn set_merge_operator(
&mut self,
merge_operator: impl Fn(&[u8], Option<&[u8]>, &[u8]) -> Option<Vec<u8>> + 'static,
) {
self.0.set_merge_operator(merge_operator)
}
pub fn as_sled(&self) -> &sled::Tree {
&self.0
}
}
pub type Iter = sled::Iter;

@ -32,11 +32,12 @@ use crate::sparql::{
};
use crate::storage::io::{dump_dataset, dump_graph, load_dataset, load_graph};
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
pub use crate::storage::ConflictableTransactionError;
pub use crate::storage::TransactionError;
pub use crate::storage::UnabortableTransactionError;
use crate::storage::{
ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageTransaction,
#[cfg(not(target_arch = "wasm32"))]
use crate::storage::StorageTransaction;
use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage};
#[cfg(not(target_arch = "wasm32"))]
pub use crate::storage::{
ConflictableTransactionError, TransactionError, UnabortableTransactionError,
};
use std::convert::TryInto;
use std::io::{BufRead, Write};
@ -93,6 +94,7 @@ impl Store {
}
/// Opens a [`Store`]() and creates it if it does not exist yet.
#[cfg(not(target_arch = "wasm32"))]
pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
Ok(Self {
storage: Storage::open(path.as_ref())?,
@ -261,6 +263,7 @@ impl Store {
/// assert!(store.contains_named_graph(ex)?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
#[cfg(not(target_arch = "wasm32"))]
pub fn transaction<T, E>(
&self,
f: impl Fn(Transaction<'_>) -> Result<T, ConflictableTransactionError<E>>,
@ -598,6 +601,7 @@ impl Store {
/// However, calling this method explicitly is still required for Windows and Android.
///
/// An [async version](SledStore::flush_async) is also available.
#[cfg(not(target_arch = "wasm32"))]
pub fn flush(&self) -> io::Result<()> {
self.storage.flush()
}
@ -608,6 +612,7 @@ impl Store {
/// However, calling this method explicitly is still required for Windows and Android.
///
/// A [sync version](SledStore::flush) is also available.
#[cfg(not(target_arch = "wasm32"))]
pub async fn flush_async(&self) -> io::Result<()> {
self.storage.flush_async().await
}
@ -623,10 +628,12 @@ impl fmt::Display for Store {
}
/// Allows inserting and deleting quads during an ACID transaction with the [`Store`].
#[cfg(not(target_arch = "wasm32"))]
pub struct Transaction<'a> {
storage: StorageTransaction<'a>,
}
#[cfg(not(target_arch = "wasm32"))]
impl Transaction<'_> {
/// Loads a graph file (i.e. triples) into the store during the transaction.
///
@ -705,7 +712,7 @@ impl Transaction<'_> {
/// let ex = NamedNodeRef::new("http://example.com")?;
/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ```
/// ```
///
/// If the file parsing fails in the middle of the file, the quads read before are still
/// considered by the transaction. Rollback the transaction by making the transaction closure
@ -849,6 +856,7 @@ impl Iterator for GraphNameIter {
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn store() -> io::Result<()> {
use crate::model::*;

@ -1,6 +1,7 @@
use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::model::vocab::{rdf, xsd};
use oxigraph::model::*;
#[cfg(not(target_arch = "wasm32"))]
use oxigraph::store::ConflictableTransactionError;
use oxigraph::store::Store;
use std::io;
@ -137,6 +138,7 @@ fn test_dump_dataset() -> io::Result<()> {
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn test_transaction_load_graph() -> io::Result<()> {
let store = Store::new()?;
store.transaction(|t| {
@ -155,6 +157,7 @@ fn test_transaction_load_graph() -> io::Result<()> {
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn test_transaction_load_dataset() -> io::Result<()> {
let store = Store::new()?;
store.transaction(|t| {
@ -168,6 +171,7 @@ fn test_transaction_load_dataset() -> io::Result<()> {
}
#[test]
#[cfg(not(target_arch = "wasm32"))]
fn test_backward_compatibility() -> io::Result<()> {
{
let store = Store::open("tests/sled_bc_data")?;

Loading…
Cancel
Save