Use directly RocksDB C API

Will allow more customizations in the future
pull/171/head
Tpt 3 years ago
parent 063683088d
commit cb146efd7c
  1. 3
      lib/Cargo.toml
  2. 29
      lib/src/storage/mod.rs
  3. 480
      lib/src/storage/rocksdb_backend.rs
  4. 5
      lib/tests/store.rs

@ -43,7 +43,8 @@ json-event-parser = "0.1"
spargebra = { version = "0.1", path="../spargebra", features = ["rdf-star"] } spargebra = { version = "0.1", path="../spargebra", features = ["rdf-star"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies] [target.'cfg(not(target_arch = "wasm32"))'.dependencies]
rocksdb = { version = "0.17", default-features = false } libc = "0.2"
librocksdb-sys = { version = "6.20.3", default-features = false }
oxhttp = { version = "0.1", optional = true } oxhttp = { version = "0.1", optional = true }
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]

@ -68,17 +68,17 @@ impl Storage {
fn setup(db: Db) -> std::io::Result<Self> { fn setup(db: Db) -> std::io::Result<Self> {
let this = Self { let this = Self {
id2str: db.open_tree(ID2STR_CF), id2str: db.open_tree(ID2STR_CF)?,
spog: db.open_tree(SPOG_CF), spog: db.open_tree(SPOG_CF)?,
posg: db.open_tree(POSG_CF), posg: db.open_tree(POSG_CF)?,
ospg: db.open_tree(OSPG_CF), ospg: db.open_tree(OSPG_CF)?,
gspo: db.open_tree(GSPO_CF), gspo: db.open_tree(GSPO_CF)?,
gpos: db.open_tree(GPOS_CF), gpos: db.open_tree(GPOS_CF)?,
gosp: db.open_tree(GOSP_CF), gosp: db.open_tree(GOSP_CF)?,
dspo: db.open_tree(DSPO_CF), dspo: db.open_tree(DSPO_CF)?,
dpos: db.open_tree(DPOS_CF), dpos: db.open_tree(DPOS_CF)?,
dosp: db.open_tree(DOSP_CF), dosp: db.open_tree(DOSP_CF)?,
graphs: db.open_tree(GRAPHS_CF), graphs: db.open_tree(GRAPHS_CF)?,
default: db, default: db,
}; };
@ -105,6 +105,7 @@ impl Storage {
this.id2str.insert(key, &new_value)?; this.id2str.insert(key, &new_value)?;
iter.next(); iter.next();
} }
iter.status()?;
version = 2; version = 2;
this.set_version(version)?; this.set_version(version)?;
this.default.flush()?; this.default.flush()?;
@ -712,6 +713,9 @@ impl Iterator for DecodingQuadIterator {
type Item = std::io::Result<EncodedQuad>; type Item = std::io::Result<EncodedQuad>;
fn next(&mut self) -> Option<std::io::Result<EncodedQuad>> { fn next(&mut self) -> Option<std::io::Result<EncodedQuad>> {
if let Err(e) = self.iter.status() {
return Some(Err(e));
}
let term = self.encoding.decode(self.iter.key()?); let term = self.encoding.decode(self.iter.key()?);
self.iter.next(); self.iter.next();
Some(term) Some(term)
@ -726,6 +730,9 @@ impl Iterator for DecodingGraphIterator {
type Item = std::io::Result<EncodedTerm>; type Item = std::io::Result<EncodedTerm>;
fn next(&mut self) -> Option<std::io::Result<EncodedTerm>> { fn next(&mut self) -> Option<std::io::Result<EncodedTerm>> {
if let Err(e) = self.iter.status() {
return Some(Err(e));
}
let term = decode_term(self.iter.key()?); let term = decode_term(self.iter.key()?);
self.iter.next(); self.iter.next();
Some(term) Some(term)

@ -1,71 +1,284 @@
use rocksdb::{ColumnFamily, DBPinnableSlice, DBRawIterator, Env, Error, Options, WriteBatch, DB}; //! Code inspired by [https://github.com/rust-rocksdb/rust-rocksdb][Rust RocksDB] under Apache License 2.0.
#![allow(unsafe_code)]
use crate::error::invalid_input_error;
use libc::{self, c_char, c_void};
use librocksdb_sys::*;
use std::borrow::Borrow;
use std::env::temp_dir; use std::env::temp_dir;
use std::io::{self, Result}; use std::ffi::{CStr, CString};
use std::mem::transmute; use std::io::{Error, ErrorKind, Result};
use std::marker::PhantomData;
use std::ops::Deref;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::{ptr, slice};
macro_rules! ffi_result {
( $($function:ident)::*() ) => {
ffi_result_impl!($($function)::*())
};
( $($function:ident)::*( $arg1:expr $(, $arg:expr)* $(,)? ) ) => {
ffi_result_impl!($($function)::*($arg1 $(, $arg)* ,))
};
}
macro_rules! ffi_result_impl {
( $($function:ident)::*( $($arg:expr,)*) ) => {{
let mut err: *mut ::libc::c_char = ::std::ptr::null_mut();
let result = $($function)::*($($arg,)* &mut err);
if err.is_null() {
Ok(result)
} else {
Err(convert_error(err))
}
}}
}
#[derive(Clone)] #[derive(Clone)]
pub struct Db(Arc<DB>); pub struct Db(Arc<DbHandler>);
unsafe impl Send for Db {}
unsafe impl Sync for Db {}
struct DbHandler {
db: *mut rocksdb_t,
options: *mut rocksdb_options_t,
env: Option<*mut rocksdb_env_t>,
column_families: Vec<&'static str>,
cf_handles: Vec<*mut rocksdb_column_family_handle_t>,
}
impl Drop for DbHandler {
fn drop(&mut self) {
unsafe {
rocksdb_close(self.db);
rocksdb_options_destroy(self.options);
if let Some(env) = self.env {
rocksdb_env_destroy(env);
}
}
}
}
impl Db { impl Db {
pub fn new(column_families: &[&str]) -> Result<Self> { pub fn new(column_families: &'static [&'static str]) -> Result<Self> {
//TODO: temp dir should not be useful let path = if cfg!(target_os = "linux") {
let temp_dir = if cfg!(target_os = "linux") { "/dev/shm/".into()
"/dev/shm/oxigraph-rocksdb".into()
} else { } else {
temp_dir().join("oxigraph-rocksdb-in-memory") temp_dir()
}; }
Ok(Self(Arc::new(Self::do_open( .join("oxigraph-temp-rocksdb");
&temp_dir, Ok(Self(Arc::new(Self::do_open(&path, column_families, true)?)))
column_families,
true,
)?)))
} }
pub fn open(path: &Path, column_families: &[&str]) -> Result<Self> { pub fn open(path: &Path, column_families: &'static [&'static str]) -> Result<Self> {
Ok(Self(Arc::new(Self::do_open(path, column_families, false)?))) Ok(Self(Arc::new(Self::do_open(path, column_families, false)?)))
} }
fn do_open(path: &Path, column_families: &[&str], mem_env: bool) -> Result<DB> { fn do_open(
let mut options = Options::default(); path: &Path,
options.create_if_missing(true); column_families: &'static [&'static str],
options.create_missing_column_families(true); in_memory: bool,
if mem_env { ) -> Result<DbHandler> {
options.set_env(&Env::mem_env().map_err(map_err)?); let c_path = CString::new(
path.to_str()
.ok_or_else(|| invalid_input_error("The DB path is not valid UTF-8"))?,
)
.map_err(invalid_input_error)?;
unsafe {
let options = rocksdb_options_create();
assert!(!options.is_null(), "rocksdb_options_create returned null");
rocksdb_options_set_create_if_missing(options, 1);
rocksdb_options_set_create_missing_column_families(options, 1);
let env = if in_memory {
let env = rocksdb_create_mem_env();
if env.is_null() {
rocksdb_options_destroy(options);
return Err(other_error("Not able to create an in-memory environment."));
}
rocksdb_options_set_env(options, env);
Some(env)
} else {
None
};
let mut column_families = column_families.to_vec();
if !column_families.contains(&"default") {
column_families.push("default")
}
let c_column_families = column_families
.iter()
.map(|cf| CString::new(*cf))
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(invalid_input_error)?;
let cf_options = column_families
.iter()
.map(|_| {
let options: *const rocksdb_options_t = rocksdb_options_create();
assert!(!options.is_null(), "rocksdb_options_create returned null");
options
})
.collect::<Vec<_>>();
let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> =
vec![ptr::null_mut(); column_families.len()];
let db = ffi_result!(rocksdb_open_column_families(
options,
c_path.as_ptr(),
column_families.len().try_into().unwrap(),
c_column_families
.iter()
.map(|cf| cf.as_ptr())
.collect::<Vec<_>>()
.as_ptr(),
cf_options.as_ptr(),
cf_handles.as_mut_ptr(),
))
.map_err(|e| {
rocksdb_options_destroy(options);
if let Some(env) = env {
rocksdb_env_destroy(env);
}
e
})?;
assert!(!db.is_null(), "rocksdb_create returned null");
for handle in &cf_handles {
if handle.is_null() {
rocksdb_close(db);
rocksdb_options_destroy(options);
if let Some(env) = env {
rocksdb_env_destroy(env);
}
return Err(other_error(
"Received null column family handle from RocksDB.",
));
}
}
Ok(DbHandler {
db,
options,
env,
column_families,
cf_handles,
})
} }
DB::open_cf(&options, path, column_families).map_err(map_err)
} }
pub fn open_tree(&self, name: &'static str) -> Tree { pub fn open_tree(&self, name: &'static str) -> Result<Tree> {
Tree { for (cf_name, cf_handle) in self.0.column_families.iter().zip(&self.0.cf_handles) {
db: self.0.clone(), if *cf_name == name {
cf_name: name, return Ok(Tree {
db: self.0.clone(),
cf_handle: *cf_handle,
});
}
} }
Err(other_error(format!(
"The column family {} does not exist",
name
)))
} }
pub fn flush(&self) -> Result<()> { pub fn flush(&self) -> Result<()> {
self.0.flush().map_err(map_err) unsafe {
let options = rocksdb_flushoptions_create();
assert!(
!options.is_null(),
"rocksdb_flushoptions_create returned null"
);
let r = ffi_result!(rocksdb_flush(self.0.db, options));
rocksdb_flushoptions_destroy(options);
r
}
} }
pub fn get(&self, key: &[u8]) -> Result<Option<DBPinnableSlice<'_>>> { pub fn get(&self, key: &[u8]) -> Result<Option<PinnableSlice<'_>>> {
self.0.get_pinned(key).map_err(map_err) unsafe {
let options = rocksdb_readoptions_create();
assert!(
!options.is_null(),
"rocksdb_readoptions_create returned null"
);
let r = ffi_result!(rocksdb_get_pinned(
self.0.db,
options,
key.as_ptr() as *const c_char,
key.len()
));
rocksdb_readoptions_destroy(options);
let slice = r?;
Ok(if slice.is_null() {
None
} else {
Some(PinnableSlice {
slice,
lifetime: PhantomData::default(),
})
})
}
} }
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.0.put(key, value).map_err(map_err) unsafe {
let options = rocksdb_writeoptions_create();
assert!(
!options.is_null(),
"rocksdb_writeoptions_create returned null"
);
let r = ffi_result!(rocksdb_put(
self.0.db,
options,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
));
rocksdb_writeoptions_destroy(options);
r
}
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Tree { pub struct Tree {
db: Arc<DB>, db: Arc<DbHandler>,
cf_name: &'static str, cf_handle: *mut rocksdb_column_family_handle_t,
} }
unsafe impl Send for Tree {}
unsafe impl Sync for Tree {}
impl Tree { impl Tree {
pub fn get(&self, key: &[u8]) -> Result<Option<DBPinnableSlice<'_>>> { pub fn get(&self, key: &[u8]) -> Result<Option<PinnableSlice<'_>>> {
self.db.get_pinned_cf(self.get_cf(), key).map_err(map_err) unsafe {
let options = rocksdb_readoptions_create();
assert!(
!options.is_null(),
"rocksdb_readoptions_create returned null"
);
let r = ffi_result!(rocksdb_get_pinned_cf(
self.db.db,
options,
self.cf_handle,
key.as_ptr() as *const c_char,
key.len()
));
rocksdb_readoptions_destroy(options);
let slice = r?;
Ok(if slice.is_null() {
None
} else {
Some(PinnableSlice {
slice,
lifetime: PhantomData::default(),
})
})
}
} }
pub fn contains_key(&self, key: &[u8]) -> Result<bool> { pub fn contains_key(&self, key: &[u8]) -> Result<bool> {
@ -73,7 +286,24 @@ impl Tree {
} }
pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
self.db.put_cf(self.get_cf(), key, value).map_err(map_err) unsafe {
let options = rocksdb_writeoptions_create();
assert!(
!options.is_null(),
"rocksdb_writeoptions_create returned null"
);
let r = ffi_result!(rocksdb_put_cf(
self.db.db,
options,
self.cf_handle,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
));
rocksdb_writeoptions_destroy(options);
r
}
} }
pub fn insert_empty(&self, key: &[u8]) -> Result<()> { pub fn insert_empty(&self, key: &[u8]) -> Result<()> {
@ -81,25 +311,72 @@ impl Tree {
} }
pub fn remove(&self, key: &[u8]) -> Result<()> { pub fn remove(&self, key: &[u8]) -> Result<()> {
self.db.delete_cf(self.get_cf(), key).map_err(map_err) unsafe {
let options = rocksdb_writeoptions_create();
assert!(
!options.is_null(),
"rocksdb_writeoptions_create returned null"
);
let r = ffi_result!(rocksdb_delete_cf(
self.db.db,
options,
self.cf_handle,
key.as_ptr() as *const c_char,
key.len()
));
rocksdb_writeoptions_destroy(options);
r
}
} }
pub fn clear(&self) -> Result<()> { pub fn clear(&self) -> Result<()> {
let mut batch = WriteBatch::default(); unsafe {
batch.delete_range_cf(self.get_cf(), [].as_ref(), [u8::MAX; 257].as_ref()); let options = rocksdb_writeoptions_create();
self.db.write(batch).map_err(map_err) assert!(
!options.is_null(),
"rocksdb_writeoptions_create returned null"
);
let start = [];
let end = [c_char::MAX; 257];
let r = ffi_result!(rocksdb_delete_range_cf(
self.db.db,
options,
self.cf_handle,
start.as_ptr(),
start.len(),
end.as_ptr(),
end.len(),
));
rocksdb_writeoptions_destroy(options);
r
}
} }
pub fn iter(&self) -> Iter { pub fn iter(&self) -> Iter {
self.scan_prefix(&[]) self.scan_prefix(&[])
} }
#[allow(unsafe_code)]
pub fn scan_prefix(&self, prefix: &[u8]) -> Iter { pub fn scan_prefix(&self, prefix: &[u8]) -> Iter {
let mut iter = self.db.raw_iterator_cf(self.get_cf()); unsafe {
iter.seek(&prefix); let options = rocksdb_readoptions_create();
// Safe because we clone the same DB from which we take an iterator assert!(
unsafe { Iter::new(iter, self.db.clone(), prefix.into()) } !options.is_null(),
"rocksdb_readoptions_create returned null"
);
let iter = rocksdb_create_iterator_cf(self.db.db, options, self.cf_handle);
assert!(!options.is_null(), "rocksdb_create_iterator returned null");
if prefix.is_empty() {
rocksdb_iter_seek_to_first(iter);
} else {
rocksdb_iter_seek(iter, prefix.as_ptr() as *const c_char, prefix.len());
}
Iter {
iter,
_options: options,
prefix: prefix.to_vec(),
_db: self.db.clone(),
}
}
} }
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
@ -113,54 +390,113 @@ impl Tree {
} }
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.iter().key().is_none() !self.iter().is_valid()
} }
}
pub struct PinnableSlice<'a> {
slice: *mut rocksdb_pinnableslice_t,
lifetime: PhantomData<&'a ()>,
}
impl<'a> Drop for PinnableSlice<'a> {
fn drop(&mut self) {
unsafe {
rocksdb_pinnableslice_destroy(self.slice);
}
}
}
impl<'a> Deref for PinnableSlice<'a> {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe {
let mut len = 0;
let val = rocksdb_pinnableslice_value(self.slice, &mut len);
slice::from_raw_parts(val as *const u8, len)
}
}
}
impl<'a> AsRef<[u8]> for PinnableSlice<'a> {
fn as_ref(&self) -> &[u8] {
&*self
}
}
#[allow(clippy::expect_used)] impl<'a> Borrow<[u8]> for PinnableSlice<'a> {
fn get_cf(&self) -> &ColumnFamily { fn borrow(&self) -> &[u8] {
self.db &*self
.cf_handle(self.cf_name)
.expect("A column family that should exist in RocksDB does not exist")
} }
} }
pub struct Iter { pub struct Iter {
iter: DBRawIterator<'static>, iter: *mut rocksdb_iterator_t,
prefix: Vec<u8>, prefix: Vec<u8>,
_db: Arc<DB>, // needed to ensure that DB still lives while iter is used _db: Arc<DbHandler>, // needed to ensure that DB still lives while iter is used
_options: *mut rocksdb_readoptions_t, // needed to ensure that options still lives while iter is used
} }
unsafe impl Send for Iter {}
unsafe impl Sync for Iter {}
impl Iter { impl Iter {
/// Creates a static iterator from a non static one by keeping a ARC reference to the database pub fn is_valid(&self) -> bool {
/// Caller must ensure that the iterator belongs to the same database unsafe {
/// if rocksdb_iter_valid(self.iter) == 0 {
/// This unsafe method is required to get static iterators and ease the usage of the library. return false;
#[allow(unsafe_code, clippy::useless_transmute)] }
unsafe fn new(iter: DBRawIterator<'_>, db: Arc<DB>, prefix: Vec<u8>) -> Self { let mut len = 0;
Self { let val = rocksdb_iter_key(self.iter, &mut len);
iter: transmute(iter), slice::from_raw_parts(val as *const u8, len).starts_with(&self.prefix)
prefix,
_db: db,
} }
} }
pub fn is_valid(&self) -> bool { pub fn status(&self) -> Result<()> {
self.iter.valid() unsafe { ffi_result!(rocksdb_iter_get_error(self.iter)) }
}
pub fn next(&mut self) {
unsafe {
rocksdb_iter_next(self.iter);
}
} }
pub fn key(&self) -> Option<&[u8]> { pub fn key(&self) -> Option<&[u8]> {
self.iter.key().filter(|k| k.starts_with(&self.prefix)) if self.is_valid() {
unsafe {
let mut len = 0;
let val = rocksdb_iter_key(self.iter, &mut len);
Some(slice::from_raw_parts(val as *const u8, len))
}
} else {
None
}
} }
pub fn value(&self) -> Option<&[u8]> { pub fn value(&self) -> Option<&[u8]> {
self.iter.value() if self.is_valid() {
unsafe {
let mut len = 0;
let val = rocksdb_iter_value(self.iter, &mut len);
Some(slice::from_raw_parts(val as *const u8, len))
}
} else {
None
}
} }
}
pub fn next(&mut self) { fn convert_error(ptr: *const c_char) -> Error {
self.iter.next() let message = unsafe {
} let s = CStr::from_ptr(ptr).to_string_lossy().into_owned();
libc::free(ptr as *mut c_void);
s
};
other_error(message)
} }
fn map_err(e: Error) -> io::Error { fn other_error(error: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Error {
io::Error::new(io::ErrorKind::Other, e) Error::new(ErrorKind::InvalidInput, error)
} }

@ -138,7 +138,8 @@ fn test_dump_dataset() -> io::Result<()> {
#[test] #[test]
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
fn test_backward_compatibility() -> io::Result<()> { fn test_backward_compatibility() -> io::Result<()> {
{ // We run twice to check if data is properly saved and closed
for _ in 0..2 {
let store = Store::open("tests/rocksdb_bc_data")?; let store = Store::open("tests/rocksdb_bc_data")?;
for q in quads(GraphNameRef::DefaultGraph) { for q in quads(GraphNameRef::DefaultGraph) {
assert!(store.contains(q)?); assert!(store.contains(q)?);
@ -153,7 +154,7 @@ fn test_backward_compatibility() -> io::Result<()> {
vec![NamedOrBlankNode::from(graph_name)], vec![NamedOrBlankNode::from(graph_name)],
store.named_graphs().collect::<io::Result<Vec<_>>>()? store.named_graphs().collect::<io::Result<Vec<_>>>()?
); );
}; }
reset_dir("tests/rocksdb_bc_data")?; reset_dir("tests/rocksdb_bc_data")?;
Ok(()) Ok(())
} }

Loading…
Cancel
Save