Refactors backend code

Makes it easier to implement transactions
pull/171/head
Tpt 3 years ago
parent dfe50cff11
commit e297de73d1
  1. 4
      js/src/store.rs
  2. 128
      lib/src/storage/fallback_backend.rs
  3. 238
      lib/src/storage/mod.rs
  4. 90
      lib/src/storage/rocksdb_backend.rs
  5. 18
      lib/src/store.rs

@ -52,8 +52,8 @@ impl JsStore {
} }
#[wasm_bindgen(getter=size)] #[wasm_bindgen(getter=size)]
pub fn size(&self) -> usize { pub fn size(&self) -> Result<usize, JsValue> {
self.store.len() self.store.len().map_err(to_err)
} }
#[wasm_bindgen(js_name = match)] #[wasm_bindgen(js_name = match)]

@ -2,68 +2,97 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::io::Result; use std::io::Result;
use std::sync::{Arc, Mutex, RwLock}; use std::sync::{Arc, RwLock};
#[derive(Clone)] #[derive(Clone)]
pub struct Db { pub struct Db(Arc<RwLock<BTreeMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>);
trees: Arc<Mutex<BTreeMap<&'static str, Tree>>>,
default: Tree,
}
impl Db { impl Db {
pub fn new(_column_families: &[&str]) -> Result<Self> { pub fn new(column_families: &'static [&'static str]) -> Result<Self> {
Ok(Self { let mut trees = BTreeMap::new();
trees: Arc::default(), for cf in column_families {
default: Tree::default(), trees.insert(ColumnFamily(*cf), BTreeMap::default());
}) }
} trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists.
Ok(Self(Arc::new(RwLock::new(trees))))
pub fn open_tree(&self, name: &'static str) -> Result<Tree> { }
Ok(self.trees.lock().unwrap().entry(name).or_default().clone())
pub fn column_family(&self, name: &'static str) -> Option<ColumnFamily> {
let name = ColumnFamily(name);
if self.0.read().unwrap().contains_key(&name) {
Some(name)
} else {
None
}
} }
pub fn flush(&self) -> Result<()> { pub fn flush(&self) -> Result<()> {
Ok(()) Ok(())
} }
}
#[derive(Clone, Default)]
pub struct Tree {
tree: Arc<RwLock<BTreeMap<Vec<u8>, Vec<u8>>>>,
}
impl Tree {
pub fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.tree.read().unwrap().get(key).map(|v| v.to_vec()))
}
pub fn contains_key(&self, key: &[u8]) -> Result<bool> { pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.tree.read().unwrap().contains_key(key.as_ref())) Ok(self
} .0
.read()
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { .unwrap()
self.tree.write().unwrap().insert(key.into(), value.into()); .get(column_family)
.unwrap()
.get(key)
.map(|v| v.to_vec()))
}
pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> {
Ok(self
.0
.read()
.unwrap()
.get(column_family)
.unwrap()
.contains_key(key.as_ref()))
}
pub fn insert(&self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
self.0
.write()
.unwrap()
.get_mut(column_family)
.unwrap()
.insert(key.into(), value.into());
Ok(()) Ok(())
} }
pub fn insert_empty(&self, key: &[u8]) -> Result<()> { pub fn insert_empty(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
self.insert(key, &[]) self.insert(column_family, key, &[])
} }
pub fn remove(&self, key: &[u8]) -> Result<bool> { pub fn remove(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> {
Ok(self.tree.write().unwrap().remove(key.as_ref()).is_some()) Ok(self
.0
.write()
.unwrap()
.get_mut(column_family)
.unwrap()
.remove(key.as_ref())
.is_some())
} }
pub fn clear(&self) -> Result<()> { pub fn clear(&self, column_family: &ColumnFamily) -> Result<()> {
Ok(self.tree.write().unwrap().clear()) Ok(self
.0
.write()
.unwrap()
.get_mut(column_family)
.unwrap()
.clear())
} }
pub fn iter(&self) -> Iter { pub fn iter(&self, column_family: &ColumnFamily) -> Iter {
self.scan_prefix(&[]) self.scan_prefix(column_family, &[])
} }
pub fn scan_prefix(&self, prefix: &[u8]) -> Iter { pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Iter {
let tree = self.tree.read().unwrap(); let trees = self.0.read().unwrap();
let tree = trees.get(column_family).unwrap();
let data: Vec<_> = if prefix.is_empty() { let data: Vec<_> = if prefix.is_empty() {
tree.iter().map(|(k, v)| (k.clone(), v.clone())).collect() tree.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} else { } else {
@ -77,15 +106,24 @@ impl Tree {
Iter { iter, current } Iter { iter, current }
} }
pub fn len(&self) -> usize { pub fn len(&self, column_family: &ColumnFamily) -> Result<usize> {
self.tree.read().unwrap().len() Ok(self.0.read().unwrap().get(column_family).unwrap().len())
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self, column_family: &ColumnFamily) -> Result<bool> {
self.tree.read().unwrap().is_empty() Ok(self
.0
.read()
.unwrap()
.get(column_family)
.unwrap()
.is_empty())
} }
} }
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq)]
pub struct ColumnFamily(&'static str);
pub struct Iter { pub struct Iter {
iter: std::vec::IntoIter<(Vec<u8>, Vec<u8>)>, iter: std::vec::IntoIter<(Vec<u8>, Vec<u8>)>,
current: Option<(Vec<u8>, Vec<u8>)>, current: Option<(Vec<u8>, Vec<u8>)>,

@ -8,9 +8,10 @@ use crate::storage::binary_encoder::{
}; };
use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup, TermEncoder}; use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup, TermEncoder};
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
use fallback_backend::{Db, Iter, Tree}; use fallback_backend::{ColumnFamily, Db, Iter};
#[cfg(not(target_arch = "wasm32"))]
use rocksdb_backend::{ColumnFamily, Db, Iter};
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use rocksdb_backend::{Db, Iter, Tree};
use std::path::Path; use std::path::Path;
mod binary_encoder; mod binary_encoder;
@ -44,18 +45,18 @@ const COLUMN_FAMILIES: [&str; 11] = [
#[derive(Clone)] #[derive(Clone)]
pub struct Storage { pub struct Storage {
db: Db, db: Db,
default: Tree, default_cf: ColumnFamily,
id2str: Tree, id2str_cf: ColumnFamily,
spog: Tree, spog_cf: ColumnFamily,
posg: Tree, posg_cf: ColumnFamily,
ospg: Tree, ospg_cf: ColumnFamily,
gspo: Tree, gspo_cf: ColumnFamily,
gpos: Tree, gpos_cf: ColumnFamily,
gosp: Tree, gosp_cf: ColumnFamily,
dspo: Tree, dspo_cf: ColumnFamily,
dpos: Tree, dpos_cf: ColumnFamily,
dosp: Tree, dosp_cf: ColumnFamily,
graphs: Tree, graphs_cf: ColumnFamily,
} }
impl Storage { impl Storage {
@ -70,18 +71,18 @@ impl Storage {
fn setup(db: Db) -> std::io::Result<Self> { fn setup(db: Db) -> std::io::Result<Self> {
let this = Self { let this = Self {
default: db.open_tree(DEFAULT_CF)?, default_cf: db.column_family(DEFAULT_CF).unwrap(),
id2str: db.open_tree(ID2STR_CF)?, id2str_cf: db.column_family(ID2STR_CF).unwrap(),
spog: db.open_tree(SPOG_CF)?, spog_cf: db.column_family(SPOG_CF).unwrap(),
posg: db.open_tree(POSG_CF)?, posg_cf: db.column_family(POSG_CF).unwrap(),
ospg: db.open_tree(OSPG_CF)?, ospg_cf: db.column_family(OSPG_CF).unwrap(),
gspo: db.open_tree(GSPO_CF)?, gspo_cf: db.column_family(GSPO_CF).unwrap(),
gpos: db.open_tree(GPOS_CF)?, gpos_cf: db.column_family(GPOS_CF).unwrap(),
gosp: db.open_tree(GOSP_CF)?, gosp_cf: db.column_family(GOSP_CF).unwrap(),
dspo: db.open_tree(DSPO_CF)?, dspo_cf: db.column_family(DSPO_CF).unwrap(),
dpos: db.open_tree(DPOS_CF)?, dpos_cf: db.column_family(DPOS_CF).unwrap(),
dosp: db.open_tree(DOSP_CF)?, dosp_cf: db.column_family(DOSP_CF).unwrap(),
graphs: db.open_tree(GRAPHS_CF)?, graphs_cf: db.column_family(GRAPHS_CF).unwrap(),
db, db,
}; };
@ -91,7 +92,8 @@ impl Storage {
for quad in this.quads() { for quad in this.quads() {
let quad = quad?; let quad = quad?;
if !quad.graph_name.is_default_graph() { if !quad.graph_name.is_default_graph() {
this.graphs.insert_empty(&encode_term(&quad.graph_name))?; this.db
.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name))?;
} }
} }
version = 1; version = 1;
@ -100,12 +102,12 @@ impl Storage {
} }
if version == 1 { if version == 1 {
// We migrate to v2 // We migrate to v2
let mut iter = this.id2str.iter(); let mut iter = this.db.iter(&this.id2str_cf);
while let (Some(key), Some(value)) = (iter.key(), iter.value()) { while let (Some(key), Some(value)) = (iter.key(), iter.value()) {
let mut new_value = Vec::with_capacity(value.len() + 4); let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&u32::MAX.to_be_bytes()); new_value.extend_from_slice(&u32::MAX.to_be_bytes());
new_value.extend_from_slice(value); new_value.extend_from_slice(value);
this.id2str.insert(key, &new_value)?; this.db.insert(&this.id2str_cf, key, &new_value)?;
iter.next(); iter.next();
} }
iter.status()?; iter.status()?;
@ -128,37 +130,40 @@ impl Storage {
} }
fn ensure_version(&self) -> std::io::Result<u64> { fn ensure_version(&self) -> std::io::Result<u64> {
Ok(if let Some(version) = self.default.get(b"oxversion")? { Ok(
let mut buffer = [0; 8]; if let Some(version) = self.db.get(&self.default_cf, b"oxversion")? {
buffer.copy_from_slice(&version); let mut buffer = [0; 8];
u64::from_be_bytes(buffer) buffer.copy_from_slice(&version);
} else { u64::from_be_bytes(buffer)
self.set_version(LATEST_STORAGE_VERSION)?; } else {
LATEST_STORAGE_VERSION self.set_version(LATEST_STORAGE_VERSION)?;
}) LATEST_STORAGE_VERSION
},
)
} }
fn set_version(&self, version: u64) -> std::io::Result<()> { fn set_version(&self, version: u64) -> std::io::Result<()> {
self.default.insert(b"oxversion", &version.to_be_bytes())?; self.db
.insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?;
Ok(()) Ok(())
} }
pub fn len(&self) -> usize { pub fn len(&self) -> std::io::Result<usize> {
self.gspo.len() + self.dspo.len() Ok(self.db.len(&self.gspo_cf)? + self.db.len(&self.dspo_cf)?)
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> std::io::Result<bool> {
self.gspo.is_empty() && self.dspo.is_empty() Ok(self.db.is_empty(&self.gspo_cf)? && self.db.is_empty(&self.dspo_cf)?)
} }
pub fn contains(&self, quad: &EncodedQuad) -> std::io::Result<bool> { pub fn contains(&self, quad: &EncodedQuad) -> std::io::Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
if quad.graph_name.is_default_graph() { if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad); write_spo_quad(&mut buffer, quad);
Ok(self.dspo.contains_key(&buffer)?) Ok(self.db.contains_key(&self.dspo_cf, &buffer)?)
} else { } else {
write_gspo_quad(&mut buffer, quad); write_gspo_quad(&mut buffer, quad);
Ok(self.gspo.contains_key(&buffer)?) Ok(self.db.contains_key(&self.gspo_cf, &buffer)?)
} }
} }
@ -398,53 +403,59 @@ impl Storage {
pub fn named_graphs(&self) -> DecodingGraphIterator { pub fn named_graphs(&self) -> DecodingGraphIterator {
DecodingGraphIterator { DecodingGraphIterator {
iter: self.graphs.iter(), iter: self.db.iter(&self.graphs_cf),
} }
} }
pub fn contains_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result<bool> { pub fn contains_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result<bool> {
self.graphs.contains_key(&encode_term(graph_name)) self.db
.contains_key(&self.graphs_cf, &encode_term(graph_name))
} }
fn spog_quads(&self, prefix: &[u8]) -> DecodingQuadIterator { fn spog_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.spog, prefix, QuadEncoding::Spog) self.inner_quads(&self.spog_cf, prefix, QuadEncoding::Spog)
} }
fn posg_quads(&self, prefix: &[u8]) -> DecodingQuadIterator { fn posg_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.posg, prefix, QuadEncoding::Posg) self.inner_quads(&self.posg_cf, prefix, QuadEncoding::Posg)
} }
fn ospg_quads(&self, prefix: &[u8]) -> DecodingQuadIterator { fn ospg_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.ospg, prefix, QuadEncoding::Ospg) self.inner_quads(&self.ospg_cf, prefix, QuadEncoding::Ospg)
} }
fn gspo_quads(&self, prefix: &[u8]) -> DecodingQuadIterator { fn gspo_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.gspo, prefix, QuadEncoding::Gspo) self.inner_quads(&self.gspo_cf, prefix, QuadEncoding::Gspo)
} }
fn gpos_quads(&self, prefix: &[u8]) -> DecodingQuadIterator { fn gpos_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.gpos, prefix, QuadEncoding::Gpos) self.inner_quads(&self.gpos_cf, prefix, QuadEncoding::Gpos)
} }
fn gosp_quads(&self, prefix: &[u8]) -> DecodingQuadIterator { fn gosp_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.gosp, prefix, QuadEncoding::Gosp) self.inner_quads(&self.gosp_cf, prefix, QuadEncoding::Gosp)
} }
fn dspo_quads(&self, prefix: &[u8]) -> DecodingQuadIterator { fn dspo_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.dspo, prefix, QuadEncoding::Dspo) self.inner_quads(&self.dspo_cf, prefix, QuadEncoding::Dspo)
} }
fn dpos_quads(&self, prefix: &[u8]) -> DecodingQuadIterator { fn dpos_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.dpos, prefix, QuadEncoding::Dpos) self.inner_quads(&self.dpos_cf, prefix, QuadEncoding::Dpos)
} }
fn dosp_quads(&self, prefix: &[u8]) -> DecodingQuadIterator { fn dosp_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
Self::inner_quads(&self.dosp, prefix, QuadEncoding::Dosp) self.inner_quads(&self.dosp_cf, prefix, QuadEncoding::Dosp)
} }
fn inner_quads(tree: &Tree, prefix: &[u8], encoding: QuadEncoding) -> DecodingQuadIterator { fn inner_quads(
&self,
column_family: &ColumnFamily,
prefix: &[u8],
encoding: QuadEncoding,
) -> DecodingQuadIterator {
DecodingQuadIterator { DecodingQuadIterator {
iter: tree.scan_prefix(prefix), iter: self.db.scan_prefix(column_family, prefix),
encoding, encoding,
} }
} }
@ -455,57 +466,57 @@ impl Storage {
Ok(if quad.graph_name.is_default_graph() { Ok(if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, &encoded); write_spo_quad(&mut buffer, &encoded);
if self.dspo.contains_key(buffer.as_slice())? { if self.db.contains_key(&self.dspo_cf, buffer.as_slice())? {
false false
} else { } else {
self.insert_quad_triple(quad, &encoded)?; self.insert_quad_triple(quad, &encoded)?;
self.dspo.insert_empty(buffer.as_slice())?; self.db.insert_empty(&self.dspo_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_pos_quad(&mut buffer, &encoded); write_pos_quad(&mut buffer, &encoded);
self.dpos.insert_empty(buffer.as_slice())?; self.db.insert_empty(&self.dpos_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_osp_quad(&mut buffer, &encoded); write_osp_quad(&mut buffer, &encoded);
self.dosp.insert_empty(buffer.as_slice())?; self.db.insert_empty(&self.dosp_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
true true
} }
} else { } else {
write_spog_quad(&mut buffer, &encoded); write_spog_quad(&mut buffer, &encoded);
if self.spog.contains_key(buffer.as_slice())? { if self.db.contains_key(&self.spog_cf, buffer.as_slice())? {
false false
} else { } else {
self.insert_quad_triple(quad, &encoded)?; self.insert_quad_triple(quad, &encoded)?;
self.spog.insert_empty(buffer.as_slice())?; self.db.insert_empty(&self.spog_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_posg_quad(&mut buffer, &encoded); write_posg_quad(&mut buffer, &encoded);
self.posg.insert_empty(buffer.as_slice())?; self.db.insert_empty(&self.posg_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_ospg_quad(&mut buffer, &encoded); write_ospg_quad(&mut buffer, &encoded);
self.ospg.insert_empty(buffer.as_slice())?; self.db.insert_empty(&self.ospg_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_gspo_quad(&mut buffer, &encoded); write_gspo_quad(&mut buffer, &encoded);
self.gspo.insert_empty(buffer.as_slice())?; self.db.insert_empty(&self.gspo_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_gpos_quad(&mut buffer, &encoded); write_gpos_quad(&mut buffer, &encoded);
self.gpos.insert_empty(buffer.as_slice())?; self.db.insert_empty(&self.gpos_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_gosp_quad(&mut buffer, &encoded); write_gosp_quad(&mut buffer, &encoded);
self.gosp.insert_empty(buffer.as_slice())?; self.db.insert_empty(&self.gosp_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_term(&mut buffer, &encoded.graph_name); write_term(&mut buffer, &encoded.graph_name);
if !self.graphs.contains_key(&buffer)? { if !self.db.contains_key(&self.graphs_cf, &buffer)? {
self.graphs.insert_empty(&buffer)?; self.db.insert_empty(&self.graphs_cf, &buffer)?;
self.insert_graph_name(quad.graph_name, &encoded.graph_name)?; self.insert_graph_name(quad.graph_name, &encoded.graph_name)?;
} }
buffer.clear(); buffer.clear();
@ -525,16 +536,16 @@ impl Storage {
Ok(if quad.graph_name.is_default_graph() { Ok(if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad); write_spo_quad(&mut buffer, quad);
if self.dspo.contains_key(buffer.as_slice())? { if self.db.contains_key(&self.dspo_cf, buffer.as_slice())? {
self.dspo.remove(buffer.as_slice())?; self.db.remove(&self.dspo_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_pos_quad(&mut buffer, quad); write_pos_quad(&mut buffer, quad);
self.dpos.remove(buffer.as_slice())?; self.db.remove(&self.dpos_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_osp_quad(&mut buffer, quad); write_osp_quad(&mut buffer, quad);
self.dosp.remove(buffer.as_slice())?; self.db.remove(&self.dosp_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
self.remove_quad_triple(quad)?; self.remove_quad_triple(quad)?;
@ -546,28 +557,28 @@ impl Storage {
} else { } else {
write_spog_quad(&mut buffer, quad); write_spog_quad(&mut buffer, quad);
if self.spog.contains_key(buffer.as_slice())? { if self.db.contains_key(&self.spog_cf, buffer.as_slice())? {
self.spog.remove(buffer.as_slice())?; self.db.remove(&self.spog_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_posg_quad(&mut buffer, quad); write_posg_quad(&mut buffer, quad);
self.posg.remove(buffer.as_slice())?; self.db.remove(&self.posg_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_ospg_quad(&mut buffer, quad); write_ospg_quad(&mut buffer, quad);
self.ospg.remove(buffer.as_slice())?; self.db.remove(&self.ospg_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_gspo_quad(&mut buffer, quad); write_gspo_quad(&mut buffer, quad);
self.gspo.remove(buffer.as_slice())?; self.db.remove(&self.gspo_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_gpos_quad(&mut buffer, quad); write_gpos_quad(&mut buffer, quad);
self.gpos.remove(buffer.as_slice())?; self.db.remove(&self.gpos_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
write_gosp_quad(&mut buffer, quad); write_gosp_quad(&mut buffer, quad);
self.gosp.remove(buffer.as_slice())?; self.db.remove(&self.gosp_cf, buffer.as_slice())?;
buffer.clear(); buffer.clear();
self.remove_quad_triple(quad)?; self.remove_quad_triple(quad)?;
@ -582,10 +593,10 @@ impl Storage {
pub fn insert_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> std::io::Result<bool> { pub fn insert_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> std::io::Result<bool> {
let encoded_graph_name = graph_name.into(); let encoded_graph_name = graph_name.into();
let encoded = encode_term(&encoded_graph_name); let encoded = encode_term(&encoded_graph_name);
Ok(if self.graphs.contains_key(&encoded)? { Ok(if self.db.contains_key(&self.graphs_cf, &encoded)? {
false false
} else { } else {
self.graphs.insert_empty(&encoded)?; self.db.insert_empty(&self.graphs_cf, &encoded)?;
self.insert_term(graph_name.into(), &encoded_graph_name)?; self.insert_term(graph_name.into(), &encoded_graph_name)?;
true true
}) })
@ -618,8 +629,8 @@ impl Storage {
self.remove_encoded(&quad?)?; self.remove_encoded(&quad?)?;
} }
let encoded_graph = encode_term(&graph_name); let encoded_graph = encode_term(&graph_name);
Ok(if self.graphs.contains_key(&encoded_graph)? { Ok(if self.db.contains_key(&self.graphs_cf, &encoded_graph)? {
self.graphs.remove(&encoded_graph)?; self.db.remove(&self.graphs_cf, &encoded_graph)?;
self.remove_term(&graph_name)?; self.remove_term(&graph_name)?;
true true
} else { } else {
@ -628,28 +639,28 @@ impl Storage {
} }
pub fn remove_all_named_graphs(&self) -> std::io::Result<()> { pub fn remove_all_named_graphs(&self) -> std::io::Result<()> {
self.gspo.clear()?; self.db.clear(&self.gspo_cf)?;
self.gpos.clear()?; self.db.clear(&self.gpos_cf)?;
self.gosp.clear()?; self.db.clear(&self.gosp_cf)?;
self.spog.clear()?; self.db.clear(&self.spog_cf)?;
self.posg.clear()?; self.db.clear(&self.posg_cf)?;
self.ospg.clear()?; self.db.clear(&self.ospg_cf)?;
self.graphs.clear()?; self.db.clear(&self.graphs_cf)?;
Ok(()) Ok(())
} }
pub fn clear(&self) -> std::io::Result<()> { pub fn clear(&self) -> std::io::Result<()> {
self.dspo.clear()?; self.db.clear(&self.dspo_cf)?;
self.dpos.clear()?; self.db.clear(&self.dpos_cf)?;
self.dosp.clear()?; self.db.clear(&self.dosp_cf)?;
self.gspo.clear()?; self.db.clear(&self.gspo_cf)?;
self.gpos.clear()?; self.db.clear(&self.gpos_cf)?;
self.gosp.clear()?; self.db.clear(&self.gosp_cf)?;
self.spog.clear()?; self.db.clear(&self.spog_cf)?;
self.posg.clear()?; self.db.clear(&self.posg_cf)?;
self.ospg.clear()?; self.db.clear(&self.ospg_cf)?;
self.graphs.clear()?; self.db.clear(&self.graphs_cf)?;
self.id2str.clear()?; self.db.clear(&self.id2str_cf)?;
Ok(()) Ok(())
} }
@ -659,15 +670,15 @@ impl Storage {
} }
pub fn get_str(&self, key: &StrHash) -> std::io::Result<Option<String>> { pub fn get_str(&self, key: &StrHash) -> std::io::Result<Option<String>> {
self.id2str self.db
.get(&key.to_be_bytes())? .get(&self.id2str_cf, &key.to_be_bytes())?
.map(|v| String::from_utf8(v[4..].to_vec())) .map(|v| String::from_utf8(v[4..].to_vec()))
.transpose() .transpose()
.map_err(invalid_data_error) .map_err(invalid_data_error)
} }
pub fn contains_str(&self, key: &StrHash) -> std::io::Result<bool> { pub fn contains_str(&self, key: &StrHash) -> std::io::Result<bool> {
self.id2str.contains_key(&key.to_be_bytes()) self.db.contains_key(&self.id2str_cf, &key.to_be_bytes())
} }
} }
@ -745,31 +756,34 @@ impl TermEncoder for Storage {
type Error = std::io::Error; type Error = std::io::Error;
fn insert_str(&self, key: &StrHash, value: &str) -> std::io::Result<()> { fn insert_str(&self, key: &StrHash, value: &str) -> std::io::Result<()> {
if let Some(value) = self.id2str.get(&key.to_be_bytes())? { if let Some(value) = self.db.get(&self.id2str_cf, &key.to_be_bytes())? {
let mut value = value.to_vec(); let mut value = value.to_vec();
let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?); let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?);
let new_number = number.saturating_add(1); let new_number = number.saturating_add(1);
value[..4].copy_from_slice(&new_number.to_be_bytes()); value[..4].copy_from_slice(&new_number.to_be_bytes());
self.id2str.insert(&key.to_be_bytes(), &value)? self.db
.insert(&self.id2str_cf, &key.to_be_bytes(), &value)?
} else { } else {
let mut buffer = Vec::with_capacity(value.len() + 4); let mut buffer = Vec::with_capacity(value.len() + 4);
buffer.extend_from_slice(&1_u32.to_be_bytes()); buffer.extend_from_slice(&1_u32.to_be_bytes());
buffer.extend_from_slice(value.as_bytes()); buffer.extend_from_slice(value.as_bytes());
self.id2str.insert(&key.to_be_bytes(), &buffer)?; self.db
.insert(&self.id2str_cf, &key.to_be_bytes(), &buffer)?;
} }
Ok(()) Ok(())
} }
fn remove_str(&self, key: &StrHash) -> std::io::Result<()> { fn remove_str(&self, key: &StrHash) -> std::io::Result<()> {
if let Some(value) = self.id2str.get(&key.to_be_bytes())? { if let Some(value) = self.db.get(&self.id2str_cf, &key.to_be_bytes())? {
let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?); let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?);
let new_number = number.saturating_sub(1); let new_number = number.saturating_sub(1);
if new_number == 0 { if new_number == 0 {
self.id2str.remove(&key.to_be_bytes())?; self.db.remove(&self.id2str_cf, &key.to_be_bytes())?;
} else { } else {
let mut value = value.to_vec(); let mut value = value.to_vec();
value[..4].copy_from_slice(&new_number.to_be_bytes()); value[..4].copy_from_slice(&new_number.to_be_bytes());
self.id2str.insert(&key.to_be_bytes(), &value)?; self.db
.insert(&self.id2str_cf, &key.to_be_bytes(), &value)?;
} }
} }
Ok(()) Ok(())

@ -169,19 +169,13 @@ impl Db {
} }
} }
pub fn open_tree(&self, name: &'static str) -> Result<Tree> { pub fn column_family(&self, name: &'static str) -> Option<ColumnFamily> {
for (cf_name, cf_handle) in self.0.column_families.iter().zip(&self.0.cf_handles) { for (cf_name, cf_handle) in self.0.column_families.iter().zip(&self.0.cf_handles) {
if *cf_name == name { if *cf_name == name {
return Ok(Tree { return Some(ColumnFamily(*cf_handle));
db: self.0.clone(),
cf_handle: *cf_handle,
});
} }
} }
Err(other_error(format!( None
"The column family {} does not exist",
name
)))
} }
pub fn flush(&self) -> Result<()> { pub fn flush(&self) -> Result<()> {
@ -196,19 +190,12 @@ impl Db {
r r
} }
} }
}
#[derive(Clone)]
pub struct Tree {
db: Arc<DbHandler>,
cf_handle: *mut rocksdb_column_family_handle_t,
}
unsafe impl Send for Tree {} pub fn get(
unsafe impl Sync for Tree {} &self,
column_family: &ColumnFamily,
impl Tree { key: &[u8],
pub fn get(&self, key: &[u8]) -> Result<Option<PinnableSlice<'_>>> { ) -> Result<Option<PinnableSlice<'_>>> {
unsafe { unsafe {
let options = rocksdb_readoptions_create(); let options = rocksdb_readoptions_create();
assert!( assert!(
@ -216,9 +203,9 @@ impl Tree {
"rocksdb_readoptions_create returned null" "rocksdb_readoptions_create returned null"
); );
let r = ffi_result!(rocksdb_get_pinned_cf( let r = ffi_result!(rocksdb_get_pinned_cf(
self.db.db, self.0.db,
options, options,
self.cf_handle, column_family.0,
key.as_ptr() as *const c_char, key.as_ptr() as *const c_char,
key.len() key.len()
)); ));
@ -235,11 +222,11 @@ impl Tree {
} }
} }
pub fn contains_key(&self, key: &[u8]) -> Result<bool> { pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> {
Ok(self.get(key)?.is_some()) //TODO: optimize Ok(self.get(column_family, key)?.is_some()) //TODO: optimize
} }
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { pub fn insert(&self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
unsafe { unsafe {
let options = rocksdb_writeoptions_create(); let options = rocksdb_writeoptions_create();
assert!( assert!(
@ -247,9 +234,9 @@ impl Tree {
"rocksdb_writeoptions_create returned null" "rocksdb_writeoptions_create returned null"
); );
let r = ffi_result!(rocksdb_put_cf( let r = ffi_result!(rocksdb_put_cf(
self.db.db, self.0.db,
options, options,
self.cf_handle, column_family.0,
key.as_ptr() as *const c_char, key.as_ptr() as *const c_char,
key.len(), key.len(),
value.as_ptr() as *const c_char, value.as_ptr() as *const c_char,
@ -260,11 +247,11 @@ impl Tree {
} }
} }
pub fn insert_empty(&self, key: &[u8]) -> Result<()> { pub fn insert_empty(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
self.insert(key, &[]) self.insert(column_family, key, &[])
} }
pub fn remove(&self, key: &[u8]) -> Result<()> { pub fn remove(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
unsafe { unsafe {
let options = rocksdb_writeoptions_create(); let options = rocksdb_writeoptions_create();
assert!( assert!(
@ -272,9 +259,9 @@ impl Tree {
"rocksdb_writeoptions_create returned null" "rocksdb_writeoptions_create returned null"
); );
let r = ffi_result!(rocksdb_delete_cf( let r = ffi_result!(rocksdb_delete_cf(
self.db.db, self.0.db,
options, options,
self.cf_handle, column_family.0,
key.as_ptr() as *const c_char, key.as_ptr() as *const c_char,
key.len() key.len()
)); ));
@ -283,7 +270,7 @@ impl Tree {
} }
} }
pub fn clear(&self) -> Result<()> { pub fn clear(&self, column_family: &ColumnFamily) -> Result<()> {
unsafe { unsafe {
let options = rocksdb_writeoptions_create(); let options = rocksdb_writeoptions_create();
assert!( assert!(
@ -293,9 +280,9 @@ impl Tree {
let start = []; let start = [];
let end = [c_char::MAX; 257]; let end = [c_char::MAX; 257];
let r = ffi_result!(rocksdb_delete_range_cf( let r = ffi_result!(rocksdb_delete_range_cf(
self.db.db, self.0.db,
options, options,
self.cf_handle, column_family.0,
start.as_ptr(), start.as_ptr(),
start.len(), start.len(),
end.as_ptr(), end.as_ptr(),
@ -306,18 +293,18 @@ impl Tree {
} }
} }
pub fn iter(&self) -> Iter { pub fn iter(&self, column_family: &ColumnFamily) -> Iter {
self.scan_prefix(&[]) self.scan_prefix(column_family, &[])
} }
pub fn scan_prefix(&self, prefix: &[u8]) -> Iter { pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Iter {
unsafe { unsafe {
let options = rocksdb_readoptions_create(); let options = rocksdb_readoptions_create();
assert!( assert!(
!options.is_null(), !options.is_null(),
"rocksdb_readoptions_create returned null" "rocksdb_readoptions_create returned null"
); );
let iter = rocksdb_create_iterator_cf(self.db.db, options, self.cf_handle); let iter = rocksdb_create_iterator_cf(self.0.db, options, column_family.0);
assert!(!options.is_null(), "rocksdb_create_iterator returned null"); assert!(!options.is_null(), "rocksdb_create_iterator returned null");
if prefix.is_empty() { if prefix.is_empty() {
rocksdb_iter_seek_to_first(iter); rocksdb_iter_seek_to_first(iter);
@ -328,26 +315,37 @@ impl Tree {
iter, iter,
_options: options, _options: options,
prefix: prefix.to_vec(), prefix: prefix.to_vec(),
_db: self.db.clone(), _db: self.0.clone(),
} }
} }
} }
pub fn len(&self) -> usize { pub fn len(&self, column_family: &ColumnFamily) -> Result<usize> {
let mut count = 0; let mut count = 0;
let mut iter = self.iter(); let mut iter = self.iter(column_family);
while iter.is_valid() { while iter.is_valid() {
count += 1; count += 1;
iter.next(); iter.next();
} }
count iter.status()?; // We makes sure there is no read problem
Ok(count)
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self, column_family: &ColumnFamily) -> Result<bool> {
!self.iter().is_valid() let iter = self.iter(column_family);
iter.status()?; // We makes sure there is no read problem
Ok(!iter.is_valid())
} }
} }
// It is fine to not keep a lifetime: there is no way to use this type without the database being still in scope.
// So, no use after free possible.
#[derive(Clone)]
pub struct ColumnFamily(*mut rocksdb_column_family_handle_t);
unsafe impl Send for ColumnFamily {}
unsafe impl Sync for ColumnFamily {}
pub struct PinnableSlice<'a> { pub struct PinnableSlice<'a> {
slice: *mut rocksdb_pinnableslice_t, slice: *mut rocksdb_pinnableslice_t,
lifetime: PhantomData<&'a ()>, lifetime: PhantomData<&'a ()>,

@ -178,12 +178,12 @@ impl Store {
/// Returns the number of quads in the store /// Returns the number of quads in the store
/// ///
/// Warning: this function executes a full scan /// Warning: this function executes a full scan
pub fn len(&self) -> usize { pub fn len(&self) -> io::Result<usize> {
self.storage.len() self.storage.len()
} }
/// Returns if the store is empty /// Returns if the store is empty
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> io::Result<bool> {
self.storage.is_empty() self.storage.is_empty()
} }
@ -491,10 +491,10 @@ impl Store {
/// let quad = QuadRef::new(ex, ex, ex, ex); /// let quad = QuadRef::new(ex, ex, ex, ex);
/// let store = Store::new()?; /// let store = Store::new()?;
/// store.insert(quad)?; /// store.insert(quad)?;
/// assert_eq!(1, store.len()); /// assert_eq!(1, store.len()?);
/// ///
/// store.clear_graph(ex)?; /// store.clear_graph(ex)?;
/// assert_eq!(0, store.len()); /// assert_eq!(store.is_empty()?);
/// assert_eq!(1, store.named_graphs().count()); /// assert_eq!(1, store.named_graphs().count());
/// # Result::<_,Box<dyn std::error::Error>>::Ok(()) /// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
@ -515,10 +515,10 @@ impl Store {
/// let quad = QuadRef::new(ex, ex, ex, ex); /// let quad = QuadRef::new(ex, ex, ex, ex);
/// let store = Store::new()?; /// let store = Store::new()?;
/// store.insert(quad)?; /// store.insert(quad)?;
/// assert_eq!(1, store.len()); /// assert_eq!(1, store.len()?);
/// ///
/// store.remove_named_graph(ex)?; /// store.remove_named_graph(ex)?;
/// assert!(store.is_empty()); /// assert!(store.is_empty()?);
/// assert_eq!(0, store.named_graphs().count()); /// assert_eq!(0, store.named_graphs().count());
/// # Result::<_,Box<dyn std::error::Error>>::Ok(()) /// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
@ -540,10 +540,10 @@ impl Store {
/// let store = Store::new()?; /// let store = Store::new()?;
/// store.insert(QuadRef::new(ex, ex, ex, ex))?; /// store.insert(QuadRef::new(ex, ex, ex, ex))?;
/// store.insert(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?; /// store.insert(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?;
/// assert_eq!(2, store.len()); /// assert_eq!(2, store.len()?);
/// ///
/// store.clear()?; /// store.clear()?;
/// assert!(store.is_empty()); /// assert!(store.is_empty()?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(()) /// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
pub fn clear(&self) -> io::Result<()> { pub fn clear(&self) -> io::Result<()> {
@ -676,7 +676,7 @@ fn store() -> io::Result<()> {
assert!(store.insert(&default_quad)?); assert!(store.insert(&default_quad)?);
assert!(!store.insert(&default_quad)?); assert!(!store.insert(&default_quad)?);
assert_eq!(store.len(), 4); assert_eq!(store.len()?, 4);
assert_eq!(store.iter().collect::<Result<Vec<_>, _>>()?, all_quads); assert_eq!(store.iter().collect::<Result<Vec<_>, _>>()?, all_quads);
assert_eq!( assert_eq!(
store store

Loading…
Cancel
Save