Adds back in memory WASM support

pull/173/head
Tpt 3 years ago
parent ed17e86853
commit 8d20f65890
  1. 1
      lib/src/model/xsd/date_time.rs
  2. 10
      lib/src/sparql/update.rs
  3. 420
      lib/src/storage/backend/fallback.rs
  4. 2
      lib/src/storage/backend/mod.rs
  5. 208
      lib/src/storage/backend/rocksdb.rs
  6. 1
      lib/src/storage/binary_encoder.rs
  7. 56
      lib/src/storage/mod.rs
  8. 10
      lib/src/store.rs

@ -1340,6 +1340,7 @@ impl Timestamp {
} }
} }
#[allow(clippy::unnecessary_wraps)]
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
fn since_unix_epoch() -> Result<Duration, DateTimeError> { fn since_unix_epoch() -> Result<Duration, DateTimeError> {
Ok(Duration::new( Ok(Duration::new(

@ -32,14 +32,14 @@ pub fn evaluate_update(
options: UpdateOptions, options: UpdateOptions,
) -> Result<(), EvaluationError> { ) -> Result<(), EvaluationError> {
let base_iri = update.inner.base_iri.map(Rc::new); let base_iri = update.inner.base_iri.map(Rc::new);
let client = Client::new(options.query_options.http_timeout);
storage storage
.transaction(move |transaction| { .transaction(move |transaction| {
let client = Client::new(options.query_options.http_timeout);
SimpleUpdateEvaluator { SimpleUpdateEvaluator {
transaction, transaction,
base_iri: base_iri.clone(), base_iri: base_iri.clone(),
options: &options, options: options.clone(),
client: &client, client,
} }
.eval_all(&update.inner.operations, &update.using_datasets) .eval_all(&update.inner.operations, &update.using_datasets)
.map_err(|e| match e { .map_err(|e| match e {
@ -62,8 +62,8 @@ pub fn evaluate_update(
struct SimpleUpdateEvaluator<'a> { struct SimpleUpdateEvaluator<'a> {
transaction: StorageWriter<'a>, transaction: StorageWriter<'a>,
base_iri: Option<Rc<Iri<String>>>, base_iri: Option<Rc<Iri<String>>>,
options: &'a UpdateOptions, options: UpdateOptions,
client: &'a Client, client: Client,
} }
impl SimpleUpdateEvaluator<'_> { impl SimpleUpdateEvaluator<'_> {

@ -1,368 +1,246 @@
//! TODO: This storage is dramatically naive. //! TODO: This storage is dramatically naive.
use crate::error::invalid_input_error; use crate::error::invalid_input_error;
use crate::storage::backend::{CompactionAction, CompactionFilter}; use std::cell::RefCell;
use std::collections::{hash_map, BTreeMap, HashMap, HashSet}; use std::collections::{BTreeMap, HashMap};
use std::ffi::CString; use std::io::Result;
use std::io::{Error, Result}; use std::mem::transmute;
use std::sync::{Arc, RwLock}; use std::rc::{Rc, Weak};
use std::sync::{Arc, RwLock, RwLockWriteGuard};
pub struct ColumnFamilyDefinition { pub struct ColumnFamilyDefinition {
pub name: &'static str, pub name: &'static str,
pub merge_operator: Option<MergeOperator>,
pub compaction_filter: Option<CompactionFilter>,
pub use_iter: bool, pub use_iter: bool,
pub min_prefix_size: usize, pub min_prefix_size: usize,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Db(Arc<DbInternals>); pub struct Db(Arc<RwLock<HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>);
#[derive(Default)]
struct DbInternals {
trees: RwLock<HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>,
merge_operators: HashMap<ColumnFamily, MergeOperator>,
compaction_filters: HashMap<ColumnFamily, CompactionFilter>,
}
impl Db { impl Db {
pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> { pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> {
let mut trees = HashMap::new(); let mut trees = HashMap::new();
let mut merge_operators = HashMap::new();
let mut compaction_filters = HashMap::new();
for cf in column_families { for cf in column_families {
let name = ColumnFamily(cf.name); trees.insert(ColumnFamily(cf.name), BTreeMap::default());
trees.insert(name.clone(), BTreeMap::default());
if let Some(me) = cf.merge_operator {
merge_operators.insert(name.clone(), me);
}
if let Some(cf) = cf.compaction_filter {
compaction_filters.insert(name.clone(), cf);
}
} }
trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists. trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists.
Ok(Self(Arc::new(DbInternals { Ok(Self(Arc::new(RwLock::new(trees))))
trees: RwLock::new(trees),
merge_operators,
compaction_filters,
})))
} }
pub fn column_family(&self, name: &'static str) -> Option<ColumnFamily> { pub fn column_family(&self, name: &'static str) -> Option<ColumnFamily> {
let name = ColumnFamily(name); let name = ColumnFamily(name);
if self.0.trees.read().unwrap().contains_key(&name) { if self.0.read().unwrap().contains_key(&name) {
Some(name) Some(name)
} else { } else {
None None
} }
} }
pub fn flush(&self, _column_family: &ColumnFamily) -> Result<()> { #[must_use]
Ok(()) pub fn snapshot(&self) -> Reader {
Reader(InnerReader::Simple(self.0.clone()))
}
pub fn transaction<'a, 'b: 'a, T>(
&'b self,
f: impl Fn(Transaction<'a>) -> Result<T>,
) -> Result<T> {
f(Transaction(Rc::new(RefCell::new(self.0.write().unwrap()))))
}
} }
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct ColumnFamily(&'static str);
pub struct Reader(InnerReader);
enum InnerReader {
Simple(Arc<RwLock<HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>),
Transaction(
Weak<RefCell<RwLockWriteGuard<'static, HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>>,
),
}
impl Reader {
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<Vec<u8>>> { pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self match &self.0 {
.0 InnerReader::Simple(reader) => Ok(reader
.trees
.read() .read()
.unwrap() .unwrap()
.get(column_family) .get(column_family)
.and_then(|cf| cf.get(key).cloned())),
InnerReader::Transaction(reader) => {
if let Some(reader) = reader.upgrade() {
Ok((*reader)
.borrow()
.get(column_family)
.and_then(|cf| cf.get(key).cloned())) .and_then(|cf| cf.get(key).cloned()))
} else {
Err(invalid_input_error("The transaction is already ended"))
}
}
}
} }
pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> { pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> {
Ok(self match &self.0 {
.0 InnerReader::Simple(reader) => Ok(reader
.trees
.read() .read()
.unwrap() .unwrap()
.get(column_family) .get(column_family)
.map_or(false, |cf| cf.contains_key(key))),
InnerReader::Transaction(reader) => {
if let Some(reader) = reader.upgrade() {
Ok((*reader)
.borrow()
.get(column_family)
.map_or(false, |cf| cf.contains_key(key))) .map_or(false, |cf| cf.contains_key(key)))
}
pub fn new_batch(&self) -> WriteBatchWithIndex {
WriteBatchWithIndex {
by_cf: HashMap::new(),
db: self.clone(),
error: None,
}
}
pub fn write(&self, batch: &mut WriteBatchWithIndex) -> Result<()> {
if let Some(error) = batch.error.take() {
return Err(error);
}
let mut trees = self.0.trees.write().unwrap();
for (cf, ops) in batch.by_cf.drain() {
let tree = trees.get_mut(&cf).ok_or_else(|| {
invalid_input_error(format!("Unsupported column family {}", cf.0))
})?;
for k in ops.to_remove {
tree.remove(&k);
}
for (k, v) in ops.to_insert {
tree.insert(k, v);
}
for (k, v) in ops.to_merge {
let v = self.exec_merge(&cf, &k, tree.get(&k).map(|v| v.as_slice()), &v)?;
tree.insert(k, v);
}
}
Ok(())
}
/*pub fn insert(
&self,
column_family: &ColumnFamily,
key: &[u8],
value: &[u8],
_low_priority: bool,
) -> Result<()> {
let mut db = self.0.write().unwrap();
let tree = db.get_mut(column_family).unwrap();
if let Some(value) = Self::exec_filter(tree, key, value.into()) {
tree.tree.insert(key.into(), value.into())
} else { } else {
tree.tree.remove(key) Err(invalid_input_error("The transaction is already ended"))
};
Ok(())
} }
pub fn insert_empty(
&self,
column_family: &ColumnFamily,
key: &[u8],
low_priority: bool,
) -> Result<()> {
self.insert(column_family, key, &[], low_priority)
} }
pub fn remove(
&self,
column_family: &ColumnFamily,
key: &[u8],
_low_priority: bool,
) -> Result<bool> {
Ok(self
.0
.write()
.unwrap()
.get_mut(column_family)
.unwrap()
.tree
.remove(key.as_ref())
.is_some())
} }
pub fn merge(
&self,
column_family: &ColumnFamily,
key: &[u8],
value: &[u8],
_low_priority: bool,
) -> Result<()> {
let mut db = self.0.write().unwrap();
let tree = db.get_mut(column_family).unwrap();
match tree.tree.entry(key.into()) {
Entry::Vacant(e) => {
if let Some(value) =
Self::exec_filter(tree, key, Self::exec_merge(tree, key, None, value))
{
e.insert(value);
}
}
Entry::Occupied(mut e) => {
if let Some(value) =
Self::exec_filter(tree, key, Self::exec_merge(tree, key, None, value))
{
e.insert(value);
} else {
e.remove();
}
}
}
Ok(())
}*/
fn exec_merge(
&self,
cf: &ColumnFamily,
key: &[u8],
base: Option<&[u8]>,
value: &[u8],
) -> Result<Vec<u8>> {
let merge = self.0.merge_operators.get(cf).ok_or_else(|| {
invalid_input_error(format!("The column family {} has no merge operator", cf.0))
})?;
Ok((merge.full)(key, base, vec![value].into_iter()))
} }
fn exec_partial_merge( pub fn iter(&self, column_family: &ColumnFamily) -> Result<Iter> {
&self, self.scan_prefix(column_family, &[])
cf: &ColumnFamily,
key: &[u8],
a: &[u8],
b: &[u8],
) -> Result<Vec<u8>> {
let merge = self.0.merge_operators.get(cf).ok_or_else(|| {
invalid_input_error(format!("The column family {} has no merge operator", cf.0))
})?;
Ok((merge.partial)(key, vec![a, b].into_iter()))
} }
fn exec_filter(&self, cf: &ColumnFamily, key: &[u8], value: Vec<u8>) -> Option<Vec<u8>> { pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Result<Iter> {
let action = if let Some(filter) = self.0.compaction_filters.get(cf) { let data: Vec<_> = match &self.0 {
(filter.filter)(key, &value) InnerReader::Simple(reader) => {
let trees = reader.read().unwrap();
let tree = if let Some(tree) = trees.get(column_family) {
tree
} else { } else {
CompactionAction::Keep return Ok(Iter {
iter: Vec::new().into_iter(),
current: None,
});
}; };
match action { if prefix.is_empty() {
CompactionAction::Keep => Some(value), tree.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
CompactionAction::Remove => None, } else {
} tree.range(prefix.to_vec()..)
.take_while(|(k, _)| k.starts_with(prefix))
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
} }
pub fn iter(&self, column_family: &ColumnFamily) -> Iter {
self.scan_prefix(column_family, &[])
} }
InnerReader::Transaction(reader) => {
pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Iter { if let Some(reader) = reader.upgrade() {
let trees = self.0.trees.read().unwrap(); let trees = (*reader).borrow();
let tree = if let Some(tree) = trees.get(column_family) { let tree = if let Some(tree) = trees.get(column_family) {
tree tree
} else { } else {
return Iter { return Ok(Iter {
iter: Vec::new().into_iter(), iter: Vec::new().into_iter(),
current: None, current: None,
});
}; };
}; if prefix.is_empty() {
let data: Vec<_> = if prefix.is_empty() {
tree.iter().map(|(k, v)| (k.clone(), v.clone())).collect() tree.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} else { } else {
tree.range(prefix.to_vec()..) tree.range(prefix.to_vec()..)
.take_while(|(k, _)| k.starts_with(prefix)) .take_while(|(k, _)| k.starts_with(prefix))
.map(|(k, v)| (k.clone(), v.clone())) .map(|(k, v)| (k.clone(), v.clone()))
.collect() .collect()
}
} else {
return Err(invalid_input_error("The transaction is already ended"));
}
}
}; };
let mut iter = data.into_iter(); let mut iter = data.into_iter();
let current = iter.next(); let current = iter.next();
Iter { iter, current } Ok(Iter { iter, current })
} }
pub fn len(&self, column_family: &ColumnFamily) -> Result<usize> { pub fn len(&self, column_family: &ColumnFamily) -> Result<usize> {
Ok(self match &self.0 {
.0 InnerReader::Simple(reader) => Ok(reader
.trees
.read() .read()
.unwrap() .unwrap()
.get(column_family) .get(column_family)
.map_or(0, |tree| tree.len())),
InnerReader::Transaction(reader) => {
if let Some(reader) = reader.upgrade() {
Ok((*reader)
.borrow()
.get(column_family)
.map_or(0, |tree| tree.len())) .map_or(0, |tree| tree.len()))
} else {
Err(invalid_input_error("The transaction is already ended"))
}
}
}
} }
pub fn is_empty(&self, column_family: &ColumnFamily) -> Result<bool> { pub fn is_empty(&self, column_family: &ColumnFamily) -> Result<bool> {
Ok(self match &self.0 {
.0 InnerReader::Simple(reader) => Ok(reader
.trees
.read() .read()
.unwrap() .unwrap()
.get(column_family) .get(column_family)
.map_or(true, |tree| tree.is_empty())),
InnerReader::Transaction(reader) => {
if let Some(reader) = reader.upgrade() {
Ok((*reader)
.borrow()
.get(column_family)
.map_or(true, |tree| tree.is_empty())) .map_or(true, |tree| tree.is_empty()))
} else {
Err(invalid_input_error("The transaction is already ended"))
} }
} }
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq, Hash)]
pub struct ColumnFamily(&'static str);
pub struct WriteBatchWithIndex {
by_cf: HashMap<ColumnFamily, WriteBatchWithIndexCF>,
db: Db,
error: Option<Error>,
}
#[derive(Default)]
struct WriteBatchWithIndexCF {
// Evaluation order insert/remove then merge
to_insert: HashMap<Vec<u8>, Vec<u8>>,
to_merge: HashMap<Vec<u8>, Vec<u8>>,
to_remove: HashSet<Vec<u8>>,
} }
impl WriteBatchWithIndex {
pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) {
let cf_state = self.by_cf.entry(column_family.clone()).or_default();
cf_state.to_insert.insert(key.into(), value.into());
cf_state.to_merge.remove(key);
cf_state.to_remove.remove(key);
} }
pub fn insert_empty(&mut self, column_family: &ColumnFamily, key: &[u8]) {
self.insert(column_family, key, &[])
} }
pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) { pub struct Transaction<'a>(
let cf_state = self.by_cf.entry(column_family.clone()).or_default(); Rc<RefCell<RwLockWriteGuard<'a, HashMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>>,
cf_state.to_insert.remove(key); );
cf_state.to_merge.remove(key);
cf_state.to_remove.insert(key.into());
}
pub fn merge(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) { impl Transaction<'_> {
let cf_state = self.by_cf.entry(column_family.clone()).or_default(); #[allow(unsafe_code)]
match cf_state.to_merge.entry(key.into()) { pub fn reader(&self) -> Reader {
hash_map::Entry::Vacant(e) => { // This transmute is safe because we take a weak reference and the only Rc reference used is guarded by the lifetime.
e.insert(value.into()); Reader(InnerReader::Transaction(Rc::downgrade(unsafe {
} transmute(&self.0)
hash_map::Entry::Occupied(mut e) => { })))
match self
.db
.exec_partial_merge(column_family, key, e.get(), value)
{
Ok(value) => {
e.insert(value);
}
Err(e) => self.error = Some(e),
}
}
}
} }
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<Vec<u8>>> { pub fn contains_key_for_update(
if let Some(cf_state) = self.by_cf.get(column_family) { &self,
let value = if cf_state.to_remove.contains(key) { column_family: &ColumnFamily,
None key: &[u8],
} else if let Some(value) = cf_state.to_insert.get(key) { ) -> Result<bool> {
Some(value.clone()) Ok((*self.0)
} else { .borrow()
self.db.get(column_family, key)? .get(column_family)
}; .map_or(false, |cf| cf.contains_key(key)))
Ok(if let Some(merge) = cf_state.to_merge.get(key) {
Some(
self.db
.exec_merge(column_family, key, value.as_deref(), merge)?,
)
} else {
value
}
.and_then(|value| self.db.exec_filter(column_family, key, value)))
} else {
self.db.get(column_family, key)
}
} }
pub fn contains_key(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<bool> { pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
Ok(self.get(column_family, key)?.is_some()) //TODO: optimize self.0
.borrow_mut()
.get_mut(column_family)
.unwrap()
.insert(key.into(), value.into());
Ok(())
} }
pub fn clear(&mut self) { pub fn insert_empty(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
self.by_cf.clear(); self.insert(column_family, key, &[])
} }
pub fn len(&self) -> usize { pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
self.by_cf self.0
.values() .borrow_mut()
.map(|v| v.to_insert.len() + v.to_remove.len() + v.to_merge.len()) .get_mut(column_family)
.sum() .unwrap()
.remove(key);
Ok(())
} }
} }
@ -388,11 +266,3 @@ impl Iter {
Ok(()) Ok(())
} }
} }
pub struct MergeOperator {
pub full: fn(&[u8], Option<&[u8]>, SlicesIterator<'_>) -> Vec<u8>,
pub partial: fn(&[u8], SlicesIterator<'_>) -> Vec<u8>,
pub name: CString,
}
pub type SlicesIterator<'a> = std::vec::IntoIter<&'a [u8]>;

@ -2,7 +2,7 @@
//! RocksDB is available, if not in memory //! RocksDB is available, if not in memory
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
pub use fallback::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, WriteBatchWithIndex}; pub use fallback::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, Transaction};
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub use rocksdb::{ pub use rocksdb::{
ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, SstFileWriter, Transaction, ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, SstFileWriter, Transaction,

@ -10,14 +10,14 @@ use libc::{self, c_char, c_void, free};
use oxrocksdb_sys::*; use oxrocksdb_sys::*;
use rand::random; use rand::random;
use std::borrow::Borrow; use std::borrow::Borrow;
use std::cell::Cell;
use std::env::temp_dir; use std::env::temp_dir;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::fs::remove_dir_all; use std::fs::remove_dir_all;
use std::io::{Error, ErrorKind, Result}; use std::io::{Error, ErrorKind, Result};
use std::marker::PhantomData;
use std::ops::Deref; use std::ops::Deref;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::rc::Rc; use std::rc::{Rc, Weak};
use std::sync::Arc; use std::sync::Arc;
use std::{ptr, slice}; use std::{ptr, slice};
@ -321,16 +321,6 @@ impl Db {
None None
} }
/// Unsafe reader (data might appear and disapear between two reads)
/// Use [`snapshot`] if you don't want that.
#[must_use]
pub fn reader(&self) -> Reader {
Reader {
inner: InnerReader::Raw(self.0.clone()),
options: unsafe { rocksdb_readoptions_create_copy(self.0.read_options) },
}
}
#[must_use] #[must_use]
pub fn snapshot(&self) -> Reader { pub fn snapshot(&self) -> Reader {
unsafe { unsafe {
@ -351,34 +341,12 @@ impl Db {
} }
} }
pub fn transaction<T>(&self, f: impl Fn(&mut Transaction) -> Result<T>) -> Result<T> { pub fn transaction<'a, 'b: 'a, T>(
&'b self,
f: impl Fn(Transaction<'a>) -> Result<T>,
) -> Result<T> {
loop { loop {
let mut transaction = self.build_transaction(); let transaction = unsafe {
match { f(&mut transaction) } {
Ok(result) => {
transaction.commit()?;
return Ok(result);
}
Err(e) => {
transaction.rollback()?;
let is_conflict_error = e.get_ref().map_or(false, |e| {
let msg = e.to_string();
msg == "Resource busy: "
|| msg == "Operation timed out: Timeout waiting to lock key"
});
if is_conflict_error {
// We retry
continue;
}
return Err(e);
}
}
}
}
#[must_use]
fn build_transaction(&self) -> Transaction {
unsafe {
let transaction = rocksdb_transaction_begin( let transaction = rocksdb_transaction_begin(
self.0.db, self.0.db,
self.0.write_options, self.0.write_options,
@ -389,18 +357,46 @@ impl Db {
!transaction.is_null(), !transaction.is_null(),
"rocksdb_transaction_begin returned null" "rocksdb_transaction_begin returned null"
); );
transaction
};
let read_options = unsafe {
let options = rocksdb_readoptions_create_copy(self.0.read_options); let options = rocksdb_readoptions_create_copy(self.0.read_options);
rocksdb_readoptions_set_snapshot( rocksdb_readoptions_set_snapshot(
options, options,
rocksdb_transaction_get_snapshot(transaction), rocksdb_transaction_get_snapshot(transaction),
); );
Transaction { options
inner: Rc::new(InnerTransaction { };
transaction, let result = f(Transaction {
is_ended: Cell::new(false), transaction: Rc::new(transaction),
_db: self.0.clone(), read_options,
}), _lifetime: PhantomData::default(),
read_options: options, });
match result {
Ok(result) => {
unsafe {
ffi_result!(rocksdb_transaction_commit(transaction))?;
rocksdb_transaction_destroy(transaction);
rocksdb_readoptions_destroy(read_options);
}
return Ok(result);
}
Err(e) => {
unsafe {
ffi_result!(rocksdb_transaction_rollback(transaction))?;
rocksdb_transaction_destroy(transaction);
rocksdb_readoptions_destroy(read_options);
}
let is_conflict_error = e.get_ref().map_or(false, |e| {
let msg = e.to_string();
msg == "Resource busy: "
|| msg == "Operation timed out: Timeout waiting to lock key"
});
if !is_conflict_error {
// We raise the error
return Err(e);
}
}
} }
} }
} }
@ -473,9 +469,8 @@ pub struct Reader {
#[derive(Clone)] #[derive(Clone)]
enum InnerReader { enum InnerReader {
Raw(Arc<DbHandler>),
Snapshot(Rc<InnerSnapshot>), Snapshot(Rc<InnerSnapshot>),
Transaction(Rc<InnerTransaction>), Transaction(Weak<*mut rocksdb_transaction_t>),
} }
struct InnerSnapshot { struct InnerSnapshot {
@ -508,13 +503,6 @@ impl Reader {
pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<PinnableSlice>> { pub fn get(&self, column_family: &ColumnFamily, key: &[u8]) -> Result<Option<PinnableSlice>> {
unsafe { unsafe {
let slice = match &self.inner { let slice = match &self.inner {
InnerReader::Raw(inner) => ffi_result!(rocksdb_transactiondb_get_pinned_cf(
inner.db,
self.options,
column_family.0,
key.as_ptr() as *const c_char,
key.len()
)),
InnerReader::Snapshot(inner) => ffi_result!(rocksdb_transactiondb_get_pinned_cf( InnerReader::Snapshot(inner) => ffi_result!(rocksdb_transactiondb_get_pinned_cf(
inner.db.db, inner.db.db,
self.options, self.options,
@ -523,16 +511,17 @@ impl Reader {
key.len() key.len()
)), )),
InnerReader::Transaction(inner) => { InnerReader::Transaction(inner) => {
if inner.is_ended.get() { if let Some(inner) = inner.upgrade() {
return Err(invalid_input_error("The transaction is already ended"));
}
ffi_result!(rocksdb_transaction_get_pinned_cf( ffi_result!(rocksdb_transaction_get_pinned_cf(
inner.transaction, *inner,
self.options, self.options,
column_family.0, column_family.0,
key.as_ptr() as *const c_char, key.as_ptr() as *const c_char,
key.len() key.len()
)) ))
} else {
return Err(invalid_input_error("The transaction is already ended"));
}
} }
}?; }?;
Ok(if slice.is_null() { Ok(if slice.is_null() {
@ -584,21 +573,15 @@ impl Reader {
); );
} }
let iter = match &self.inner { let iter = match &self.inner {
InnerReader::Raw(inner) => {
rocksdb_transactiondb_create_iterator_cf(inner.db, options, column_family.0)
}
InnerReader::Snapshot(inner) => { InnerReader::Snapshot(inner) => {
rocksdb_transactiondb_create_iterator_cf(inner.db.db, options, column_family.0) rocksdb_transactiondb_create_iterator_cf(inner.db.db, options, column_family.0)
} }
InnerReader::Transaction(inner) => { InnerReader::Transaction(inner) => {
if inner.is_ended.get() { if let Some(inner) = inner.upgrade() {
rocksdb_transaction_create_iterator_cf(*inner, options, column_family.0)
} else {
return Err(invalid_input_error("The transaction is already ended")); return Err(invalid_input_error("The transaction is already ended"));
} }
rocksdb_transaction_create_iterator_cf(
inner.transaction,
options,
column_family.0,
)
} }
}; };
assert!(!iter.is_null(), "rocksdb_create_iterator returned null"); assert!(!iter.is_null(), "rocksdb_create_iterator returned null");
@ -636,83 +619,41 @@ impl Reader {
} }
} }
pub struct Transaction { pub struct Transaction<'a> {
inner: Rc<InnerTransaction>, transaction: Rc<*mut rocksdb_transaction_t>,
read_options: *mut rocksdb_readoptions_t, read_options: *mut rocksdb_readoptions_t,
_lifetime: PhantomData<&'a ()>,
} }
struct InnerTransaction { impl Transaction<'_> {
transaction: *mut rocksdb_transaction_t,
is_ended: Cell<bool>,
_db: Arc<DbHandler>, // Used to ensure that the transaction is not outliving the DB
}
impl Drop for Transaction {
fn drop(&mut self) {
unsafe { rocksdb_readoptions_destroy(self.read_options) }
}
}
impl Drop for InnerTransaction {
fn drop(&mut self) {
if !self.is_ended.get() {
unsafe { ffi_result!(rocksdb_transaction_rollback(self.transaction)) }.unwrap();
}
unsafe { rocksdb_transaction_destroy(self.transaction) }
}
}
impl Transaction {
fn commit(self) -> Result<()> {
self.inner.is_ended.set(true);
unsafe { ffi_result!(rocksdb_transaction_commit(self.inner.transaction)) }
}
fn rollback(self) -> Result<()> {
self.inner.is_ended.set(true);
unsafe { ffi_result!(rocksdb_transaction_rollback(self.inner.transaction)) }
}
pub fn reader(&self) -> Reader { pub fn reader(&self) -> Reader {
Reader { Reader {
inner: InnerReader::Transaction(self.inner.clone()), inner: InnerReader::Transaction(Rc::downgrade(&self.transaction)),
options: unsafe { rocksdb_readoptions_create_copy(self.read_options) }, options: unsafe { rocksdb_readoptions_create_copy(self.read_options) },
} }
} }
pub fn get_for_update( pub fn contains_key_for_update(
&self, &self,
column_family: &ColumnFamily, column_family: &ColumnFamily,
key: &[u8], key: &[u8],
) -> Result<Option<PinnableSlice>> { ) -> Result<bool> {
unsafe { unsafe {
let slice = ffi_result!(rocksdb_transaction_get_for_update_pinned_cf( let slice = ffi_result!(rocksdb_transaction_get_for_update_pinned_cf(
self.inner.transaction, *self.transaction,
self.read_options, self.read_options,
column_family.0, column_family.0,
key.as_ptr() as *const c_char, key.as_ptr() as *const c_char,
key.len() key.len()
))?; ))?;
Ok(if slice.is_null() { Ok(!slice.is_null())
None
} else {
Some(PinnableSlice(slice))
})
}
} }
pub fn contains_key_for_update(
&self,
column_family: &ColumnFamily,
key: &[u8],
) -> Result<bool> {
Ok(self.get_for_update(column_family, key)?.is_some()) //TODO: optimize
} }
pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { pub fn insert(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
unsafe { unsafe {
ffi_result!(rocksdb_transaction_put_cf( ffi_result!(rocksdb_transaction_put_cf(
self.inner.transaction, *self.transaction,
column_family.0, column_family.0,
key.as_ptr() as *const c_char, key.as_ptr() as *const c_char,
key.len(), key.len(),
@ -729,7 +670,7 @@ impl Transaction {
pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> { pub fn remove(&mut self, column_family: &ColumnFamily, key: &[u8]) -> Result<()> {
unsafe { unsafe {
ffi_result!(rocksdb_transaction_delete_cf( ffi_result!(rocksdb_transaction_delete_cf(
self.inner.transaction, *self.transaction,
column_family.0, column_family.0,
key.as_ptr() as *const c_char, key.as_ptr() as *const c_char,
key.len(), key.len(),
@ -772,6 +713,12 @@ impl Borrow<[u8]> for PinnableSlice {
} }
} }
impl From<PinnableSlice> for Vec<u8> {
fn from(value: PinnableSlice) -> Self {
value.to_vec()
}
}
pub struct Buffer { pub struct Buffer {
base: *mut u8, base: *mut u8,
len: usize, len: usize,
@ -805,6 +752,12 @@ impl Borrow<[u8]> for Buffer {
} }
} }
impl From<Buffer> for Vec<u8> {
fn from(value: Buffer) -> Self {
value.to_vec()
}
}
pub struct Iter { pub struct Iter {
iter: *mut rocksdb_iterator_t, iter: *mut rocksdb_iterator_t,
is_currently_valid: bool, is_currently_valid: bool,
@ -917,16 +870,3 @@ fn path_to_cstring(path: &Path) -> Result<CString> {
) )
.map_err(invalid_input_error) .map_err(invalid_input_error)
} }
#[test]
fn test_transaction_read_after_commit() -> Result<()> {
let db = Db::new(vec![])?;
let cf = db.column_family("default").unwrap();
let mut tr = db.build_transaction();
let reader = tr.reader();
tr.insert(&cf, b"test", b"foo")?;
assert_eq!(reader.get(&cf, b"test")?.as_deref(), Some(b"foo".as_ref()));
tr.commit()?;
assert!(reader.get(&cf, b"test").is_err());
Ok(())
}

@ -7,6 +7,7 @@ use std::io::{Cursor, Read};
use std::mem::size_of; use std::mem::size_of;
use std::rc::Rc; use std::rc::Rc;
#[cfg(not(target_arch = "wasm32"))]
pub const LATEST_STORAGE_VERSION: u64 = 1; pub const LATEST_STORAGE_VERSION: u64 = 1;
pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<StrHash>(); pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<StrHash>();

@ -1,11 +1,13 @@
use crate::error::invalid_data_error; use crate::error::invalid_data_error;
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, Quad, QuadRef, TermRef}; use crate::model::{GraphNameRef, NamedOrBlankNodeRef, Quad, QuadRef, TermRef};
use crate::storage::backend::{Reader, Transaction}; use crate::storage::backend::{Reader, Transaction};
#[cfg(not(target_arch = "wasm32"))]
use crate::storage::binary_encoder::LATEST_STORAGE_VERSION;
use crate::storage::binary_encoder::{ use crate::storage::binary_encoder::{
decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple, decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple,
write_gosp_quad, write_gpos_quad, write_gspo_quad, write_osp_quad, write_ospg_quad, write_gosp_quad, write_gpos_quad, write_gspo_quad, write_osp_quad, write_ospg_quad,
write_pos_quad, write_posg_quad, write_spo_quad, write_spog_quad, write_term, QuadEncoding, write_pos_quad, write_posg_quad, write_spo_quad, write_spog_quad, write_term, QuadEncoding,
LATEST_STORAGE_VERSION, WRITTEN_TERM_MAX_SIZE, WRITTEN_TERM_MAX_SIZE,
}; };
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup}; use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
@ -15,8 +17,8 @@ use std::io::Result;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::mem::take; use std::mem::take;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::path::Path; use std::path::{Path, PathBuf};
use std::path::PathBuf; #[cfg(not(target_arch = "wasm32"))]
use std::thread::spawn; use std::thread::spawn;
mod backend; mod backend;
@ -36,6 +38,7 @@ const DPOS_CF: &str = "dpos";
const DOSP_CF: &str = "dosp"; const DOSP_CF: &str = "dosp";
const GRAPHS_CF: &str = "graphs"; const GRAPHS_CF: &str = "graphs";
const DEFAULT_CF: &str = "default"; const DEFAULT_CF: &str = "default";
#[cfg(not(target_arch = "wasm32"))]
const BULK_LOAD_BATCH_SIZE: usize = 1024 * 1024; const BULK_LOAD_BATCH_SIZE: usize = 1024 * 1024;
/// Low level storage primitives /// Low level storage primitives
@ -142,12 +145,18 @@ impl Storage {
graphs_cf: db.column_family(GRAPHS_CF).unwrap(), graphs_cf: db.column_family(GRAPHS_CF).unwrap(),
db, db,
}; };
#[cfg(not(target_arch = "wasm32"))]
this.migrate()?;
Ok(this)
}
let mut version = this.ensure_version()?; #[cfg(not(target_arch = "wasm32"))]
fn migrate(&self) -> Result<()> {
let mut version = self.ensure_version()?;
if version == 0 { if version == 0 {
// We migrate to v1 // We migrate to v1
let mut graph_names = HashSet::new(); let mut graph_names = HashSet::new();
for quad in this.reader().quads() { for quad in self.snapshot().quads() {
let quad = quad?; let quad = quad?;
if !quad.graph_name.is_default_graph() { if !quad.graph_name.is_default_graph() {
graph_names.insert(quad.graph_name); graph_names.insert(quad.graph_name);
@ -158,14 +167,14 @@ impl Storage {
.map(|g| encode_term(&g)) .map(|g| encode_term(&g))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
graph_names.sort_unstable(); graph_names.sort_unstable();
let mut stt_file = this.db.new_sst_file()?; let mut stt_file = self.db.new_sst_file()?;
for k in graph_names { for k in graph_names {
stt_file.insert_empty(&k)?; stt_file.insert_empty(&k)?;
} }
this.db self.db
.insert_stt_files(vec![(&this.graphs_cf, stt_file.finish()?)])?; .insert_stt_files(vec![(&self.graphs_cf, stt_file.finish()?)])?;
version = 1; version = 1;
this.update_version(version)?; self.update_version(version)?;
} }
match version { match version {
@ -173,7 +182,7 @@ impl Storage {
"The RocksDB database is using the outdated encoding version {}. Automated migration is not supported, please dump the store dataset using a compatible Oxigraph version and load it again using the current version", "The RocksDB database is using the outdated encoding version {}. Automated migration is not supported, please dump the store dataset using a compatible Oxigraph version and load it again using the current version",
version version
))), ))),
LATEST_STORAGE_VERSION => Ok(this), LATEST_STORAGE_VERSION => Ok(()),
_ => Err(invalid_data_error(format!( _ => Err(invalid_data_error(format!(
"The RocksDB database is using the too recent version {}. Upgrade to the latest Oxigraph version to load this database", "The RocksDB database is using the too recent version {}. Upgrade to the latest Oxigraph version to load this database",
version version
@ -181,9 +190,10 @@ impl Storage {
} }
} }
#[cfg(not(target_arch = "wasm32"))]
fn ensure_version(&self) -> Result<u64> { fn ensure_version(&self) -> Result<u64> {
Ok( Ok(
if let Some(version) = self.reader().reader.get(&self.default_cf, b"oxversion")? { if let Some(version) = self.db.snapshot().get(&self.default_cf, b"oxversion")? {
let mut buffer = [0; 8]; let mut buffer = [0; 8];
buffer.copy_from_slice(&version); buffer.copy_from_slice(&version);
u64::from_be_bytes(buffer) u64::from_be_bytes(buffer)
@ -194,21 +204,14 @@ impl Storage {
) )
} }
#[cfg(not(target_arch = "wasm32"))]
fn update_version(&self, version: u64) -> Result<()> { fn update_version(&self, version: u64) -> Result<()> {
self.db self.db.transaction(|mut t| {
.transaction(|t| t.insert(&self.default_cf, b"oxversion", &version.to_be_bytes()))?; t.insert(&self.default_cf, b"oxversion", &version.to_be_bytes())
})?;
self.db.flush(&self.default_cf) self.db.flush(&self.default_cf)
} }
/// Unsafe reader (data might appear and disapear between two reads)
/// Use [`snapshot`] if you don't want that.
pub fn reader(&self) -> StorageReader {
StorageReader {
reader: self.db.reader(),
storage: self.clone(),
}
}
pub fn snapshot(&self) -> StorageReader { pub fn snapshot(&self) -> StorageReader {
StorageReader { StorageReader {
reader: self.db.snapshot(), reader: self.db.snapshot(),
@ -216,7 +219,10 @@ impl Storage {
} }
} }
pub fn transaction<T>(&self, f: impl Fn(StorageWriter<'_>) -> Result<T>) -> Result<T> { pub fn transaction<'a, 'b: 'a, T>(
&'b self,
f: impl Fn(StorageWriter<'a>) -> Result<T>,
) -> Result<T> {
self.db.transaction(|transaction| { self.db.transaction(|transaction| {
f(StorageWriter { f(StorageWriter {
buffer: Vec::new(), buffer: Vec::new(),
@ -579,7 +585,7 @@ impl StorageReader {
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>> { pub fn get_str(&self, key: &StrHash) -> Result<Option<String>> {
self.reader self.reader
.get(&self.storage.id2str_cf, &key.to_be_bytes())? .get(&self.storage.id2str_cf, &key.to_be_bytes())?
.map(|v| String::from_utf8(v.to_vec())) .map(|v| String::from_utf8(v.into()))
.transpose() .transpose()
.map_err(invalid_data_error) .map_err(invalid_data_error)
} }
@ -674,7 +680,7 @@ impl StrLookup for StorageReader {
pub struct StorageWriter<'a> { pub struct StorageWriter<'a> {
buffer: Vec<u8>, buffer: Vec<u8>,
transaction: &'a mut Transaction, transaction: Transaction<'a>,
storage: &'a Storage, storage: &'a Storage,
} }

@ -181,19 +181,19 @@ impl Store {
/// Checks if this store contains a given quad /// Checks if this store contains a given quad
pub fn contains<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> { pub fn contains<'a>(&self, quad: impl Into<QuadRef<'a>>) -> io::Result<bool> {
let quad = EncodedQuad::from(quad.into()); let quad = EncodedQuad::from(quad.into());
self.storage.reader().contains(&quad) self.storage.snapshot().contains(&quad)
} }
/// Returns the number of quads in the store /// Returns the number of quads in the store
/// ///
/// Warning: this function executes a full scan /// Warning: this function executes a full scan
pub fn len(&self) -> io::Result<usize> { pub fn len(&self) -> io::Result<usize> {
self.storage.reader().len() self.storage.snapshot().len()
} }
/// Returns if the store is empty /// Returns if the store is empty
pub fn is_empty(&self) -> io::Result<bool> { pub fn is_empty(&self) -> io::Result<bool> {
self.storage.reader().is_empty() self.storage.snapshot().is_empty()
} }
/// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/). /// Executes a [SPARQL 1.1 update](https://www.w3.org/TR/sparql11-update/).
@ -487,7 +487,7 @@ impl Store {
graph_name: impl Into<NamedOrBlankNodeRef<'a>>, graph_name: impl Into<NamedOrBlankNodeRef<'a>>,
) -> io::Result<bool> { ) -> io::Result<bool> {
let graph_name = EncodedTerm::from(graph_name.into()); let graph_name = EncodedTerm::from(graph_name.into());
self.storage.reader().contains_named_graph(&graph_name) self.storage.snapshot().contains_named_graph(&graph_name)
} }
/// Inserts a graph into this store /// Inserts a graph into this store
@ -684,6 +684,7 @@ impl Store {
/// Errors related to parameter validation like the base IRI use the [`InvalidInput`](std::io::ErrorKind::InvalidInput) error kind. /// Errors related to parameter validation like the base IRI use the [`InvalidInput`](std::io::ErrorKind::InvalidInput) error kind.
/// Errors related to a bad syntax in the loaded file use the [`InvalidData`](std::io::ErrorKind::InvalidData) or [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) error kinds. /// Errors related to a bad syntax in the loaded file use the [`InvalidData`](std::io::ErrorKind::InvalidData) or [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) error kinds.
/// Errors related to data loading into the store use the other error kinds. /// Errors related to data loading into the store use the other error kinds.
#[cfg(not(target_arch = "wasm32"))]
pub fn bulk_load_graph<'a>( pub fn bulk_load_graph<'a>(
&mut self, &mut self,
reader: impl BufRead, reader: impl BufRead,
@ -711,6 +712,7 @@ impl Store {
/// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store. /// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// ///
/// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files. /// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files.
#[cfg(not(target_arch = "wasm32"))]
pub fn bulk_extend(&mut self, quads: impl IntoIterator<Item = Quad>) -> io::Result<()> { pub fn bulk_extend(&mut self, quads: impl IntoIterator<Item = Quad>) -> io::Result<()> {
bulk_load(&self.storage, quads.into_iter().map(Ok)) bulk_load(&self.storage, quads.into_iter().map(Ok))
} }

Loading…
Cancel
Save