From cb146efd7cbbee6346fca4186c160188b27fbedf Mon Sep 17 00:00:00 2001 From: Tpt Date: Sat, 23 Oct 2021 18:12:16 +0200 Subject: [PATCH] Use directly RocksDB C API Will allow more customizations in the future --- lib/Cargo.toml | 3 +- lib/src/storage/mod.rs | 29 +- lib/src/storage/rocksdb_backend.rs | 480 ++++++++++++++++++++++++----- lib/tests/store.rs | 5 +- 4 files changed, 431 insertions(+), 86 deletions(-) diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 4872a02c..9d1d3b3e 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -43,7 +43,8 @@ json-event-parser = "0.1" spargebra = { version = "0.1", path="../spargebra", features = ["rdf-star"] } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] -rocksdb = { version = "0.17", default-features = false } +libc = "0.2" +librocksdb-sys = { version = "6.20.3", default-features = false } oxhttp = { version = "0.1", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 518b22d4..d4d965cb 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -68,17 +68,17 @@ impl Storage { fn setup(db: Db) -> std::io::Result { let this = Self { - id2str: db.open_tree(ID2STR_CF), - spog: db.open_tree(SPOG_CF), - posg: db.open_tree(POSG_CF), - ospg: db.open_tree(OSPG_CF), - gspo: db.open_tree(GSPO_CF), - gpos: db.open_tree(GPOS_CF), - gosp: db.open_tree(GOSP_CF), - dspo: db.open_tree(DSPO_CF), - dpos: db.open_tree(DPOS_CF), - dosp: db.open_tree(DOSP_CF), - graphs: db.open_tree(GRAPHS_CF), + id2str: db.open_tree(ID2STR_CF)?, + spog: db.open_tree(SPOG_CF)?, + posg: db.open_tree(POSG_CF)?, + ospg: db.open_tree(OSPG_CF)?, + gspo: db.open_tree(GSPO_CF)?, + gpos: db.open_tree(GPOS_CF)?, + gosp: db.open_tree(GOSP_CF)?, + dspo: db.open_tree(DSPO_CF)?, + dpos: db.open_tree(DPOS_CF)?, + dosp: db.open_tree(DOSP_CF)?, + graphs: db.open_tree(GRAPHS_CF)?, default: db, }; @@ -105,6 +105,7 @@ impl Storage { this.id2str.insert(key, &new_value)?; iter.next(); } + iter.status()?; version = 2; this.set_version(version)?; this.default.flush()?; @@ -712,6 +713,9 @@ impl Iterator for DecodingQuadIterator { type Item = std::io::Result; fn next(&mut self) -> Option> { + if let Err(e) = self.iter.status() { + return Some(Err(e)); + } let term = self.encoding.decode(self.iter.key()?); self.iter.next(); Some(term) @@ -726,6 +730,9 @@ impl Iterator for DecodingGraphIterator { type Item = std::io::Result; fn next(&mut self) -> Option> { + if let Err(e) = self.iter.status() { + return Some(Err(e)); + } let term = decode_term(self.iter.key()?); self.iter.next(); Some(term) diff --git a/lib/src/storage/rocksdb_backend.rs b/lib/src/storage/rocksdb_backend.rs index b4f6fa71..1837c4f1 100644 --- a/lib/src/storage/rocksdb_backend.rs +++ b/lib/src/storage/rocksdb_backend.rs @@ -1,71 +1,284 @@ -use rocksdb::{ColumnFamily, DBPinnableSlice, DBRawIterator, Env, Error, Options, WriteBatch, DB}; +//! Code inspired by [https://github.com/rust-rocksdb/rust-rocksdb][Rust RocksDB] under Apache License 2.0. + +#![allow(unsafe_code)] + +use crate::error::invalid_input_error; +use libc::{self, c_char, c_void}; +use librocksdb_sys::*; +use std::borrow::Borrow; use std::env::temp_dir; -use std::io::{self, Result}; -use std::mem::transmute; +use std::ffi::{CStr, CString}; +use std::io::{Error, ErrorKind, Result}; +use std::marker::PhantomData; +use std::ops::Deref; use std::path::Path; use std::sync::Arc; +use std::{ptr, slice}; + +macro_rules! ffi_result { + ( $($function:ident)::*() ) => { + ffi_result_impl!($($function)::*()) + }; + + ( $($function:ident)::*( $arg1:expr $(, $arg:expr)* $(,)? ) ) => { + ffi_result_impl!($($function)::*($arg1 $(, $arg)* ,)) + }; +} + +macro_rules! ffi_result_impl { + ( $($function:ident)::*( $($arg:expr,)*) ) => {{ + let mut err: *mut ::libc::c_char = ::std::ptr::null_mut(); + let result = $($function)::*($($arg,)* &mut err); + if err.is_null() { + Ok(result) + } else { + Err(convert_error(err)) + } + }} +} #[derive(Clone)] -pub struct Db(Arc); +pub struct Db(Arc); + +unsafe impl Send for Db {} +unsafe impl Sync for Db {} + +struct DbHandler { + db: *mut rocksdb_t, + options: *mut rocksdb_options_t, + env: Option<*mut rocksdb_env_t>, + column_families: Vec<&'static str>, + cf_handles: Vec<*mut rocksdb_column_family_handle_t>, +} + +impl Drop for DbHandler { + fn drop(&mut self) { + unsafe { + rocksdb_close(self.db); + rocksdb_options_destroy(self.options); + if let Some(env) = self.env { + rocksdb_env_destroy(env); + } + } + } +} impl Db { - pub fn new(column_families: &[&str]) -> Result { - //TODO: temp dir should not be useful - let temp_dir = if cfg!(target_os = "linux") { - "/dev/shm/oxigraph-rocksdb".into() + pub fn new(column_families: &'static [&'static str]) -> Result { + let path = if cfg!(target_os = "linux") { + "/dev/shm/".into() } else { - temp_dir().join("oxigraph-rocksdb-in-memory") - }; - Ok(Self(Arc::new(Self::do_open( - &temp_dir, - column_families, - true, - )?))) + temp_dir() + } + .join("oxigraph-temp-rocksdb"); + Ok(Self(Arc::new(Self::do_open(&path, column_families, true)?))) } - pub fn open(path: &Path, column_families: &[&str]) -> Result { + pub fn open(path: &Path, column_families: &'static [&'static str]) -> Result { Ok(Self(Arc::new(Self::do_open(path, column_families, false)?))) } - fn do_open(path: &Path, column_families: &[&str], mem_env: bool) -> Result { - let mut options = Options::default(); - options.create_if_missing(true); - options.create_missing_column_families(true); - if mem_env { - options.set_env(&Env::mem_env().map_err(map_err)?); + fn do_open( + path: &Path, + column_families: &'static [&'static str], + in_memory: bool, + ) -> Result { + let c_path = CString::new( + path.to_str() + .ok_or_else(|| invalid_input_error("The DB path is not valid UTF-8"))?, + ) + .map_err(invalid_input_error)?; + + unsafe { + let options = rocksdb_options_create(); + assert!(!options.is_null(), "rocksdb_options_create returned null"); + rocksdb_options_set_create_if_missing(options, 1); + rocksdb_options_set_create_missing_column_families(options, 1); + let env = if in_memory { + let env = rocksdb_create_mem_env(); + if env.is_null() { + rocksdb_options_destroy(options); + return Err(other_error("Not able to create an in-memory environment.")); + } + rocksdb_options_set_env(options, env); + Some(env) + } else { + None + }; + + let mut column_families = column_families.to_vec(); + if !column_families.contains(&"default") { + column_families.push("default") + } + let c_column_families = column_families + .iter() + .map(|cf| CString::new(*cf)) + .collect::, _>>() + .map_err(invalid_input_error)?; + let cf_options = column_families + .iter() + .map(|_| { + let options: *const rocksdb_options_t = rocksdb_options_create(); + assert!(!options.is_null(), "rocksdb_options_create returned null"); + options + }) + .collect::>(); + + let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = + vec![ptr::null_mut(); column_families.len()]; + let db = ffi_result!(rocksdb_open_column_families( + options, + c_path.as_ptr(), + column_families.len().try_into().unwrap(), + c_column_families + .iter() + .map(|cf| cf.as_ptr()) + .collect::>() + .as_ptr(), + cf_options.as_ptr(), + cf_handles.as_mut_ptr(), + )) + .map_err(|e| { + rocksdb_options_destroy(options); + if let Some(env) = env { + rocksdb_env_destroy(env); + } + e + })?; + assert!(!db.is_null(), "rocksdb_create returned null"); + for handle in &cf_handles { + if handle.is_null() { + rocksdb_close(db); + rocksdb_options_destroy(options); + if let Some(env) = env { + rocksdb_env_destroy(env); + } + return Err(other_error( + "Received null column family handle from RocksDB.", + )); + } + } + + Ok(DbHandler { + db, + options, + env, + column_families, + cf_handles, + }) } - DB::open_cf(&options, path, column_families).map_err(map_err) } - pub fn open_tree(&self, name: &'static str) -> Tree { - Tree { - db: self.0.clone(), - cf_name: name, + pub fn open_tree(&self, name: &'static str) -> Result { + for (cf_name, cf_handle) in self.0.column_families.iter().zip(&self.0.cf_handles) { + if *cf_name == name { + return Ok(Tree { + db: self.0.clone(), + cf_handle: *cf_handle, + }); + } } + Err(other_error(format!( + "The column family {} does not exist", + name + ))) } pub fn flush(&self) -> Result<()> { - self.0.flush().map_err(map_err) + unsafe { + let options = rocksdb_flushoptions_create(); + assert!( + !options.is_null(), + "rocksdb_flushoptions_create returned null" + ); + let r = ffi_result!(rocksdb_flush(self.0.db, options)); + rocksdb_flushoptions_destroy(options); + r + } } - pub fn get(&self, key: &[u8]) -> Result>> { - self.0.get_pinned(key).map_err(map_err) + pub fn get(&self, key: &[u8]) -> Result>> { + unsafe { + let options = rocksdb_readoptions_create(); + assert!( + !options.is_null(), + "rocksdb_readoptions_create returned null" + ); + let r = ffi_result!(rocksdb_get_pinned( + self.0.db, + options, + key.as_ptr() as *const c_char, + key.len() + )); + rocksdb_readoptions_destroy(options); + let slice = r?; + Ok(if slice.is_null() { + None + } else { + Some(PinnableSlice { + slice, + lifetime: PhantomData::default(), + }) + }) + } } pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.0.put(key, value).map_err(map_err) + unsafe { + let options = rocksdb_writeoptions_create(); + assert!( + !options.is_null(), + "rocksdb_writeoptions_create returned null" + ); + let r = ffi_result!(rocksdb_put( + self.0.db, + options, + key.as_ptr() as *const c_char, + key.len(), + value.as_ptr() as *const c_char, + value.len(), + )); + rocksdb_writeoptions_destroy(options); + r + } } } #[derive(Clone)] pub struct Tree { - db: Arc, - cf_name: &'static str, + db: Arc, + cf_handle: *mut rocksdb_column_family_handle_t, } +unsafe impl Send for Tree {} +unsafe impl Sync for Tree {} + impl Tree { - pub fn get(&self, key: &[u8]) -> Result>> { - self.db.get_pinned_cf(self.get_cf(), key).map_err(map_err) + pub fn get(&self, key: &[u8]) -> Result>> { + unsafe { + let options = rocksdb_readoptions_create(); + assert!( + !options.is_null(), + "rocksdb_readoptions_create returned null" + ); + let r = ffi_result!(rocksdb_get_pinned_cf( + self.db.db, + options, + self.cf_handle, + key.as_ptr() as *const c_char, + key.len() + )); + rocksdb_readoptions_destroy(options); + let slice = r?; + Ok(if slice.is_null() { + None + } else { + Some(PinnableSlice { + slice, + lifetime: PhantomData::default(), + }) + }) + } } pub fn contains_key(&self, key: &[u8]) -> Result { @@ -73,7 +286,24 @@ impl Tree { } pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { - self.db.put_cf(self.get_cf(), key, value).map_err(map_err) + unsafe { + let options = rocksdb_writeoptions_create(); + assert!( + !options.is_null(), + "rocksdb_writeoptions_create returned null" + ); + let r = ffi_result!(rocksdb_put_cf( + self.db.db, + options, + self.cf_handle, + key.as_ptr() as *const c_char, + key.len(), + value.as_ptr() as *const c_char, + value.len(), + )); + rocksdb_writeoptions_destroy(options); + r + } } pub fn insert_empty(&self, key: &[u8]) -> Result<()> { @@ -81,25 +311,72 @@ impl Tree { } pub fn remove(&self, key: &[u8]) -> Result<()> { - self.db.delete_cf(self.get_cf(), key).map_err(map_err) + unsafe { + let options = rocksdb_writeoptions_create(); + assert!( + !options.is_null(), + "rocksdb_writeoptions_create returned null" + ); + let r = ffi_result!(rocksdb_delete_cf( + self.db.db, + options, + self.cf_handle, + key.as_ptr() as *const c_char, + key.len() + )); + rocksdb_writeoptions_destroy(options); + r + } } pub fn clear(&self) -> Result<()> { - let mut batch = WriteBatch::default(); - batch.delete_range_cf(self.get_cf(), [].as_ref(), [u8::MAX; 257].as_ref()); - self.db.write(batch).map_err(map_err) + unsafe { + let options = rocksdb_writeoptions_create(); + assert!( + !options.is_null(), + "rocksdb_writeoptions_create returned null" + ); + let start = []; + let end = [c_char::MAX; 257]; + let r = ffi_result!(rocksdb_delete_range_cf( + self.db.db, + options, + self.cf_handle, + start.as_ptr(), + start.len(), + end.as_ptr(), + end.len(), + )); + rocksdb_writeoptions_destroy(options); + r + } } pub fn iter(&self) -> Iter { self.scan_prefix(&[]) } - #[allow(unsafe_code)] pub fn scan_prefix(&self, prefix: &[u8]) -> Iter { - let mut iter = self.db.raw_iterator_cf(self.get_cf()); - iter.seek(&prefix); - // Safe because we clone the same DB from which we take an iterator - unsafe { Iter::new(iter, self.db.clone(), prefix.into()) } + unsafe { + let options = rocksdb_readoptions_create(); + assert!( + !options.is_null(), + "rocksdb_readoptions_create returned null" + ); + let iter = rocksdb_create_iterator_cf(self.db.db, options, self.cf_handle); + assert!(!options.is_null(), "rocksdb_create_iterator returned null"); + if prefix.is_empty() { + rocksdb_iter_seek_to_first(iter); + } else { + rocksdb_iter_seek(iter, prefix.as_ptr() as *const c_char, prefix.len()); + } + Iter { + iter, + _options: options, + prefix: prefix.to_vec(), + _db: self.db.clone(), + } + } } pub fn len(&self) -> usize { @@ -113,54 +390,113 @@ impl Tree { } pub fn is_empty(&self) -> bool { - self.iter().key().is_none() + !self.iter().is_valid() } +} + +pub struct PinnableSlice<'a> { + slice: *mut rocksdb_pinnableslice_t, + lifetime: PhantomData<&'a ()>, +} + +impl<'a> Drop for PinnableSlice<'a> { + fn drop(&mut self) { + unsafe { + rocksdb_pinnableslice_destroy(self.slice); + } + } +} + +impl<'a> Deref for PinnableSlice<'a> { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + unsafe { + let mut len = 0; + let val = rocksdb_pinnableslice_value(self.slice, &mut len); + slice::from_raw_parts(val as *const u8, len) + } + } +} + +impl<'a> AsRef<[u8]> for PinnableSlice<'a> { + fn as_ref(&self) -> &[u8] { + &*self + } +} - #[allow(clippy::expect_used)] - fn get_cf(&self) -> &ColumnFamily { - self.db - .cf_handle(self.cf_name) - .expect("A column family that should exist in RocksDB does not exist") +impl<'a> Borrow<[u8]> for PinnableSlice<'a> { + fn borrow(&self) -> &[u8] { + &*self } } pub struct Iter { - iter: DBRawIterator<'static>, + iter: *mut rocksdb_iterator_t, prefix: Vec, - _db: Arc, // needed to ensure that DB still lives while iter is used + _db: Arc, // needed to ensure that DB still lives while iter is used + _options: *mut rocksdb_readoptions_t, // needed to ensure that options still lives while iter is used } +unsafe impl Send for Iter {} +unsafe impl Sync for Iter {} + impl Iter { - /// Creates a static iterator from a non static one by keeping a ARC reference to the database - /// Caller must ensure that the iterator belongs to the same database - /// - /// This unsafe method is required to get static iterators and ease the usage of the library. - #[allow(unsafe_code, clippy::useless_transmute)] - unsafe fn new(iter: DBRawIterator<'_>, db: Arc, prefix: Vec) -> Self { - Self { - iter: transmute(iter), - prefix, - _db: db, + pub fn is_valid(&self) -> bool { + unsafe { + if rocksdb_iter_valid(self.iter) == 0 { + return false; + } + let mut len = 0; + let val = rocksdb_iter_key(self.iter, &mut len); + slice::from_raw_parts(val as *const u8, len).starts_with(&self.prefix) } } - pub fn is_valid(&self) -> bool { - self.iter.valid() + pub fn status(&self) -> Result<()> { + unsafe { ffi_result!(rocksdb_iter_get_error(self.iter)) } + } + + pub fn next(&mut self) { + unsafe { + rocksdb_iter_next(self.iter); + } } pub fn key(&self) -> Option<&[u8]> { - self.iter.key().filter(|k| k.starts_with(&self.prefix)) + if self.is_valid() { + unsafe { + let mut len = 0; + let val = rocksdb_iter_key(self.iter, &mut len); + Some(slice::from_raw_parts(val as *const u8, len)) + } + } else { + None + } } pub fn value(&self) -> Option<&[u8]> { - self.iter.value() + if self.is_valid() { + unsafe { + let mut len = 0; + let val = rocksdb_iter_value(self.iter, &mut len); + Some(slice::from_raw_parts(val as *const u8, len)) + } + } else { + None + } } +} - pub fn next(&mut self) { - self.iter.next() - } +fn convert_error(ptr: *const c_char) -> Error { + let message = unsafe { + let s = CStr::from_ptr(ptr).to_string_lossy().into_owned(); + libc::free(ptr as *mut c_void); + s + }; + other_error(message) } -fn map_err(e: Error) -> io::Error { - io::Error::new(io::ErrorKind::Other, e) +fn other_error(error: impl Into>) -> Error { + Error::new(ErrorKind::InvalidInput, error) } diff --git a/lib/tests/store.rs b/lib/tests/store.rs index d7b9a73c..b43217bb 100644 --- a/lib/tests/store.rs +++ b/lib/tests/store.rs @@ -138,7 +138,8 @@ fn test_dump_dataset() -> io::Result<()> { #[test] #[cfg(not(target_arch = "wasm32"))] fn test_backward_compatibility() -> io::Result<()> { - { + // We run twice to check if data is properly saved and closed + for _ in 0..2 { let store = Store::open("tests/rocksdb_bc_data")?; for q in quads(GraphNameRef::DefaultGraph) { assert!(store.contains(q)?); @@ -153,7 +154,7 @@ fn test_backward_compatibility() -> io::Result<()> { vec![NamedOrBlankNode::from(graph_name)], store.named_graphs().collect::>>()? ); - }; + } reset_dir("tests/rocksdb_bc_data")?; Ok(()) }