@ -3,7 +3,7 @@
#![ allow(unsafe_code, trivial_casts) ]
use crate ::storage ::error ::StorageError ;
use crate ::store ::CorruptionError ;
use crate ::store ::{ CorruptionError , StoreOpenOptions } ;
use lazy_static ::lazy_static ;
use libc ::{ self , c_char , c_void , free } ;
use oxrocksdb_sys ::* ;
@ -25,6 +25,13 @@ use std::sync::Arc;
use std ::thread ::{ available_parallelism , yield_now } ;
use std ::{ ptr , slice } ;
#[ derive(PartialEq, Eq) ]
enum OpeningMode {
InMemory ,
Primary ,
Secondary ( PathBuf ) ,
}
macro_rules! ffi_result {
( $( $function :ident ) ::* ( ) ) = > {
ffi_result_impl ! ( $( $function ) ::* ( ) )
@ -69,6 +76,28 @@ lazy_static! {
} ;
}
fn primary_db_handler_or_error (
db : & InnerDbHandler ,
) -> Result < & InnerDbHandlerPrimary , StorageError > {
match db {
InnerDbHandler ::Primary ( inner_db ) | InnerDbHandler ::InMemory ( inner_db ) = > Ok ( inner_db ) ,
_ = > Err ( StorageError ::Other (
"Database is opened as secondary, can not execute this operation." . into ( ) ,
) ) ,
}
}
fn secondary_db_handler_or_error (
db : & InnerDbHandler ,
) -> Result < & InnerDbHandlerSecondary , StorageError > {
match db {
InnerDbHandler ::Secondary ( inner_db ) = > Ok ( inner_db ) ,
_ = > Err ( StorageError ::Other ( Box ::< dyn Error + Send + Sync > ::from (
"Expected secondary InnerDbHandler, got primary/in memory one" ,
) ) ) ,
}
}
pub struct ColumnFamilyDefinition {
pub name : & ' static str ,
pub use_iter : bool ,
@ -84,11 +113,44 @@ unsafe impl Send for Db {}
unsafe impl Sync for Db { }
struct DbHandler {
/// Handler that has read only access to the database
struct InnerDbHandlerSecondary {
db : * mut rocksdb_t ,
primary_path : PathBuf ,
}
/// Handler that has read and write access to the database
struct InnerDbHandlerPrimary {
db : * mut rocksdb_transactiondb_t ,
options : * mut rocksdb_options_t ,
transaction_options : * mut rocksdb_transaction_options_t ,
transactiondb_options : * mut rocksdb_transactiondb_options_t ,
path : PathBuf ,
}
enum InnerDbHandler {
Primary ( InnerDbHandlerPrimary ) ,
InMemory ( InnerDbHandlerPrimary ) ,
Secondary ( InnerDbHandlerSecondary ) ,
}
impl InnerDbHandler {
fn is_null ( & self ) -> bool {
match self {
Self ::Primary ( s ) | Self ::InMemory ( s ) = > s . db . is_null ( ) ,
Self ::Secondary ( s ) = > s . db . is_null ( ) ,
}
}
fn primary_path ( & self ) -> & PathBuf {
match self {
Self ::Primary ( s ) | Self ::InMemory ( s ) = > & s . path ,
Self ::Secondary ( s ) = > & s . primary_path ,
}
}
}
struct DbHandler {
db : InnerDbHandler ,
options : * mut rocksdb_options_t ,
read_options : * mut rocksdb_readoptions_t ,
write_options : * mut rocksdb_writeoptions_t ,
flush_options : * mut rocksdb_flushoptions_t ,
@ -99,8 +161,6 @@ struct DbHandler {
column_family_names : Vec < & ' static str > ,
cf_handles : Vec < * mut rocksdb_column_family_handle_t > ,
cf_options : Vec < * mut rocksdb_options_t > ,
path : PathBuf ,
in_memory : bool ,
}
impl Drop for DbHandler {
@ -109,7 +169,14 @@ impl Drop for DbHandler {
for cf_handle in & self . cf_handles {
rocksdb_column_family_handle_destroy ( * cf_handle ) ;
}
rocksdb_transactiondb_close ( self . db ) ;
match & self . db {
InnerDbHandler ::Primary ( s ) | InnerDbHandler ::InMemory ( s ) = > {
rocksdb_transactiondb_close ( s . db ) ;
}
InnerDbHandler ::Secondary ( s ) = > {
rocksdb_close ( s . db ) ;
}
}
for cf_option in & self . cf_options {
rocksdb_options_destroy ( * cf_option ) ;
}
@ -119,13 +186,19 @@ impl Drop for DbHandler {
rocksdb_envoptions_destroy ( self . env_options ) ;
rocksdb_ingestexternalfileoptions_destroy ( self . ingest_external_file_options ) ;
rocksdb_compactoptions_destroy ( self . compaction_options ) ;
rocksdb_transaction_options_destroy ( self . transaction_options ) ;
rocksdb_transactiondb_options_destroy ( self . transactiondb_options ) ;
if let InnerDbHandler ::Primary ( inner_db ) | InnerDbHandler ::InMemory ( inner_db ) = & self . db
{
rocksdb_transaction_options_destroy ( inner_db . transaction_options ) ;
rocksdb_transactiondb_options_destroy ( inner_db . transactiondb_options ) ;
}
rocksdb_options_destroy ( self . options ) ;
rocksdb_block_based_options_destroy ( self . block_based_table_options ) ;
}
if self . in_memory & & self . path . exists ( ) {
remove_dir_all ( & self . path ) . unwrap ( ) ;
if let InnerDbHandler ::InMemory ( db ) = & self . db {
if db . path . exists ( ) {
remove_dir_all ( & db . path ) . unwrap ( ) ;
}
}
}
}
@ -138,7 +211,11 @@ impl Db {
temp_dir ( )
}
. join ( format! ( "oxigraph-rocksdb-{}" , random ::< u128 > ( ) ) ) ;
Ok ( Self ( Arc ::new ( Self ::do_open ( path , column_families , true ) ? ) ) )
Ok ( Self ( Arc ::new ( Self ::do_open (
path ,
column_families ,
& OpeningMode ::InMemory ,
) ? ) ) )
}
pub fn open (
@ -148,17 +225,29 @@ impl Db {
Ok ( Self ( Arc ::new ( Self ::do_open (
path . to_owned ( ) ,
column_families ,
false ,
& OpeningMode ::Primary ,
) ? ) ) )
}
pub fn open_with_options (
options : StoreOpenOptions ,
column_families : Vec < ColumnFamilyDefinition > ,
) -> Result < Self , StorageError > {
match options {
StoreOpenOptions ::OpenAsSecondary ( options ) = > Ok ( Self ( Arc ::new ( Self ::do_open (
options . path ,
column_families ,
& OpeningMode ::Secondary ( options . secondary_path ) ,
) ? ) ) ) ,
}
}
fn do_open (
path : PathBuf ,
mut column_families : Vec < ColumnFamilyDefinition > ,
in_memory : bool ,
open_mode : & OpeningMode ,
) -> Result < DbHandler , StorageError > {
let c_path = path_to_cstring ( & path ) ? ;
unsafe {
let options = rocksdb_options_create ( ) ;
assert! ( ! options . is_null ( ) , "rocksdb_options_create returned null" ) ;
@ -170,28 +259,32 @@ impl Db {
available_parallelism ( ) ? . get ( ) . try_into ( ) . unwrap ( ) ,
) ;
if let Some ( available_fd ) = available_file_descriptors ( ) ? {
let max_open_files = match & open_mode {
OpeningMode ::Primary | OpeningMode ::InMemory = > {
if available_fd < 96 {
rocksdb_options_destroy ( options ) ;
return Err ( io ::Error ::new (
io ::ErrorKind ::Other ,
format! (
"Oxigraph needs at least 96 file descriptors, only {available_fd} allowed. Run e.g. `ulimit -n 512` to allow 512 opened files"
" Oxigraph needs at least 96 file descriptors , \
only { available_fd } allowed . \
Run e . g . ` ulimit - n 512 ` to allow 512 opened files "
) ,
)
. into ( ) ) ;
}
rocksdb_options_set_max_open_files (
options ,
( available_fd - 48 ) . try_into ( ) . unwrap ( ) ,
) ;
( available_fd - 48 ) . try_into ( ) . unwrap ( )
}
OpeningMode ::Secondary ( _ ) = > - 1 ,
} ;
rocksdb_options_set_max_open_files ( options , max_open_files ) ;
}
rocksdb_options_set_info_log_level ( options , 2 ) ; // We only log warnings
rocksdb_options_set_max_log_file_size ( options , 1024 * 1024 ) ; // Only 1MB log size
rocksdb_options_set_recycle_log_file_num ( options , 10 ) ; // We do not keep more than 10 log files
rocksdb_options_set_compression (
options ,
if in_m emory {
if open_mode = = & OpeningMode ::InM emory {
rocksdb_no_compression
} else {
rocksdb_lz4_compression
@ -201,7 +294,7 @@ impl Db {
) ;
rocksdb_options_set_env (
options ,
if in_m emory {
if open_mode = = & OpeningMode ::InM emory {
ROCKSDB_MEM_ENV . 0
} else {
ROCKSDB_ENV . 0
@ -219,12 +312,6 @@ impl Db {
) ;
rocksdb_options_set_block_based_table_factory ( options , block_based_table_options ) ;
let transactiondb_options = rocksdb_transactiondb_options_create ( ) ;
assert! (
! transactiondb_options . is_null ( ) ,
"rocksdb_transactiondb_options_create returned null"
) ;
if ! column_families . iter ( ) . any ( | c | c . name = = "default" ) {
column_families . push ( ColumnFamilyDefinition {
name : "default" ,
@ -261,24 +348,76 @@ impl Db {
let mut cf_handles : Vec < * mut rocksdb_column_family_handle_t > =
vec! [ ptr ::null_mut ( ) ; column_family_names . len ( ) ] ;
let db = ffi_result ! ( rocksdb_transactiondb_open_column_families_with_status (
let c_num_column_families = c_column_families . len ( ) . try_into ( ) . unwrap ( ) ;
let c_column_family_names = c_column_families
. iter ( )
. map ( | cf | cf . as_ptr ( ) )
. collect ::< Vec < _ > > ( ) ;
let db = match & open_mode {
OpeningMode ::Primary | OpeningMode ::InMemory = > {
let transactiondb_options = rocksdb_transactiondb_options_create ( ) ;
assert! (
! transactiondb_options . is_null ( ) ,
"rocksdb_transactiondb_options_create returned null"
) ;
ffi_result ! ( rocksdb_transactiondb_open_column_families_with_status (
options ,
transactiondb_options ,
c_path . as_ptr ( ) ,
c_column_families . len ( ) . try_into ( ) . unwrap ( ) ,
c_column_families
. iter ( )
. map ( | cf | cf . as_ptr ( ) )
. collect ::< Vec < _ > > ( )
. as_ptr ( ) ,
c_num_column_families ,
c_column_family_names . as_ptr ( ) ,
cf_options . as_ptr ( ) as * const * const rocksdb_options_t ,
cf_handles . as_mut_ptr ( ) ,
) )
. map ( | db | {
let transaction_options = rocksdb_transaction_options_create ( ) ;
assert! (
! transaction_options . is_null ( ) ,
"rocksdb_transaction_options_create returned null"
) ;
rocksdb_transaction_options_set_set_snapshot ( transaction_options , 1 ) ;
let primary_db_handler = InnerDbHandlerPrimary {
db ,
transaction_options ,
transactiondb_options ,
path ,
} ;
match open_mode {
OpeningMode ::InMemory = > InnerDbHandler ::InMemory ( primary_db_handler ) ,
OpeningMode ::Primary = > InnerDbHandler ::Primary ( primary_db_handler ) ,
_ = > unreachable! ( ) ,
}
} )
. map_err ( | e | {
rocksdb_transactiondb_options_destroy ( transactiondb_options ) ;
e
} )
}
OpeningMode ::Secondary ( secondary_path ) = > {
ffi_result ! ( rocksdb_open_as_secondary_column_families_with_status (
options ,
c_path . as_ptr ( ) ,
path_to_cstring ( secondary_path ) ? . as_ptr ( ) ,
c_num_column_families ,
c_column_family_names . as_ptr ( ) ,
cf_options . as_ptr ( ) as * const * const rocksdb_options_t ,
cf_handles . as_mut_ptr ( ) ,
) )
. map ( | db | {
InnerDbHandler ::Secondary ( InnerDbHandlerSecondary {
db ,
primary_path : path ,
} )
} )
}
}
. map_err ( | e | {
for cf_option in & cf_options {
rocksdb_options_destroy ( * cf_option ) ;
}
rocksdb_transactiondb_options_destroy ( transactiondb_options ) ;
rocksdb_options_destroy ( options ) ;
rocksdb_block_based_options_destroy ( block_based_table_options ) ;
e
@ -302,7 +441,7 @@ impl Db {
! write_options . is_null ( ) ,
"rocksdb_writeoptions_create returned null"
) ;
if in_m emory {
if open_mode = = & OpeningMode ::InM emory {
rocksdb_writeoptions_disable_WAL ( write_options , 1 ) ; // No need for WAL
}
@ -330,18 +469,9 @@ impl Db {
"rocksdb_compactoptions_create returned null"
) ;
let transaction_options = rocksdb_transaction_options_create ( ) ;
assert! (
! transaction_options . is_null ( ) ,
"rocksdb_transaction_options_create returned null"
) ;
rocksdb_transaction_options_set_set_snapshot ( transaction_options , 1 ) ;
Ok ( DbHandler {
db ,
options ,
transaction_options ,
transactiondb_options ,
read_options ,
write_options ,
flush_options ,
@ -352,8 +482,6 @@ impl Db {
column_family_names ,
cf_handles ,
cf_options ,
path ,
in_memory ,
} )
}
}
@ -370,12 +498,15 @@ impl Db {
#[ must_use ]
pub fn snapshot ( & self ) -> Reader {
unsafe {
let snapshot = rocksdb_transactiondb_create_snapshot ( self . 0. db ) ;
let options = rocksdb_readoptions_create_copy ( self . 0. read_options ) ;
match & self . 0. db {
InnerDbHandler ::InMemory ( inner_db_handler )
| InnerDbHandler ::Primary ( inner_db_handler ) = > {
let snapshot = rocksdb_transactiondb_create_snapshot ( inner_db_handler . db ) ;
assert! (
! snapshot . is_null ( ) ,
"rocksdb_transactiondb_create_snapshot returned null"
) ;
let options = rocksdb_readoptions_create_copy ( self . 0. read_options ) ;
rocksdb_readoptions_set_snapshot ( options , snapshot ) ;
Reader {
inner : InnerReader ::Snapshot ( Rc ::new ( InnerSnapshot {
@ -385,6 +516,19 @@ impl Db {
options ,
}
}
InnerDbHandler ::Secondary ( _ ) = > {
ffi_result ! ( rocksdb_try_catch_up_with_primary_with_status (
secondary_db_handler_or_error ( & self . 0. db ) . unwrap ( ) . db ,
) )
. unwrap ( ) ;
Reader {
inner : InnerReader ::Secondary ( self . 0. clone ( ) ) ,
options ,
}
}
}
}
}
pub fn transaction < ' a , ' b : ' a , T , E : Error + ' static + From < StorageError > > (
@ -393,10 +537,11 @@ impl Db {
) -> Result < T , E > {
loop {
let transaction = unsafe {
let db = primary_db_handler_or_error ( & self . 0. db ) ? ;
let transaction = rocksdb_transaction_begin (
self . 0 .db ,
db . db ,
self . 0. write_options ,
self . 0 .transaction_options ,
db . transaction_options ,
ptr ::null_mut ( ) ,
) ;
assert! (
@ -464,13 +609,27 @@ impl Db {
key : & [ u8 ] ,
) -> Result < Option < PinnableSlice > , StorageError > {
unsafe {
let slice = ffi_result ! ( rocksdb_transactiondb_get_pinned_cf_with_status (
self . 0. db ,
let slice = match & self . 0. db {
InnerDbHandler ::Secondary ( inner_db_handler ) = > {
ffi_result ! ( rocksdb_get_pinned_cf_with_status (
inner_db_handler . db ,
self . 0. read_options ,
column_family . 0 ,
key . as_ptr ( ) as * const c_char ,
key . len ( ) ,
) ) ?
}
InnerDbHandler ::Primary ( inner_db_handler )
| InnerDbHandler ::InMemory ( inner_db_handler ) = > {
ffi_result ! ( rocksdb_transactiondb_get_pinned_cf_with_status (
inner_db_handler . db ,
self . 0. read_options ,
column_family . 0 ,
key . as_ptr ( ) as * const c_char ,
key . len ( )
) ) ? ;
) ) ?
}
} ;
Ok ( if slice . is_null ( ) {
None
} else {
@ -495,7 +654,7 @@ impl Db {
) -> Result < ( ) , StorageError > {
unsafe {
ffi_result ! ( rocksdb_transactiondb_put_cf_with_status (
self . 0. db ,
primary_db_handler_or_error ( & self . 0. db ) ? . db ,
self . 0. write_options ,
column_family . 0 ,
key . as_ptr ( ) as * const c_char ,
@ -510,7 +669,7 @@ impl Db {
pub fn flush ( & self , column_family : & ColumnFamily ) -> Result < ( ) , StorageError > {
unsafe {
ffi_result ! ( rocksdb_transactiondb_flush_cf_with_status (
self . 0. db ,
primary_db_handler_or_error ( & self . 0. db ) ? . db ,
self . 0. flush_options ,
column_family . 0 ,
) ) ? ;
@ -522,7 +681,7 @@ impl Db {
pub fn compact ( & self , column_family : & ColumnFamily ) -> Result < ( ) , StorageError > {
unsafe {
ffi_result ! ( rocksdb_transactiondb_compact_range_cf_opt_with_status (
self . 0. db ,
primary_db_handler_or_error ( & self . 0. db ) ? . db ,
column_family . 0 ,
self . 0. compaction_options ,
ptr ::null ( ) ,
@ -536,7 +695,7 @@ impl Db {
pub fn new_sst_file ( & self ) -> Result < SstFileWriter , StorageError > {
unsafe {
let path = self . 0. path . join ( random ::< u128 > ( ) . to_string ( ) ) ;
let path = self . 0. db . primary_ path( ) . join ( random ::< u128 > ( ) . to_string ( ) ) ;
let writer = rocksdb_sstfilewriter_create ( self . 0. env_options , self . 0. options ) ;
ffi_result ! ( rocksdb_sstfilewriter_open_with_status (
writer ,
@ -576,7 +735,7 @@ impl Db {
. collect ::< Vec < _ > > ( ) ;
unsafe {
ffi_result ! ( rocksdb_transactiondb_ingest_external_files_with_status (
self . 0. db ,
primary_db_handler_or_error ( & self . 0. db ) ? . db ,
args . as_ptr ( ) ,
args . len ( )
) ) ? ;
@ -585,21 +744,26 @@ impl Db {
}
pub fn backup ( & self , target_directory : & Path ) -> Result < ( ) , StorageError > {
if self . 0. in_memory {
return Err ( StorageError ::Other (
"It is not possible to backup an in-memory database created with `Store::open`"
match & self . 0. db {
InnerDbHandler ::Secondary ( _ ) = > Err ( StorageError ::Other (
"It is not possible to backup an database opened as secondary." . into ( ) ,
) ) ,
InnerDbHandler ::InMemory ( _ ) = > Err ( StorageError ::Other (
"It is not possible to backup an in-memory database created with `Store::new`"
. into ( ) ,
) ) ;
}
) ) ,
InnerDbHandler ::Primary ( db ) = > {
let path = path_to_cstring ( target_directory ) ? ;
unsafe {
ffi_result ! ( rocksdb_transactiondb_create_checkpoint_with_status (
self . 0 .db ,
db . db ,
path . as_ptr ( )
) ) ? ;
}
Ok ( ( ) )
}
}
}
}
// It is fine to not keep a lifetime: there is no way to use this type without the database being still in scope.
@ -619,16 +783,24 @@ pub struct Reader {
enum InnerReader {
Snapshot ( Rc < InnerSnapshot > ) ,
Transaction ( Weak < * mut rocksdb_transaction_t > ) ,
Secondary ( Arc < DbHandler > ) , // TODO: we should not hold the whole enum, because we have to use
// secondary_db_handler_or_error to access rocksdb_t. we should
// safe it directly?!
}
struct InnerSnapshot {
db : Arc < DbHandler > ,
db : Arc < DbHandler > , // TODO: probably the same as InnerReader::Secondary
snapshot : * const rocksdb_snapshot_t ,
}
impl Drop for InnerSnapshot {
fn drop ( & mut self ) {
unsafe { rocksdb_transactiondb_release_snapshot ( self . db . db , self . snapshot ) }
unsafe {
rocksdb_transactiondb_release_snapshot (
primary_db_handler_or_error ( & self . db . db ) . unwrap ( ) . db ,
self . snapshot ,
)
}
}
}
@ -657,7 +829,7 @@ impl Reader {
let slice = match & self . inner {
InnerReader ::Snapshot ( inner ) = > {
ffi_result ! ( rocksdb_transactiondb_get_pinned_cf_with_status (
inner . db . db ,
primary_db_handler_or_error ( & inner . db . db ) ? . db ,
self . options ,
column_family . 0 ,
key . as_ptr ( ) as * const c_char ,
@ -679,6 +851,11 @@ impl Reader {
) ) ;
}
}
InnerReader ::Secondary ( _ ) = > {
return Err ( StorageError ::Other (
"Database is opened as secondary, can not execute Reader.get" . into ( ) ,
) )
}
} ? ;
Ok ( if slice . is_null ( ) {
None
@ -737,9 +914,11 @@ impl Reader {
) ;
}
let iter = match & self . inner {
InnerReader ::Snapshot ( inner ) = > {
rocksdb_transactiondb_create_iterator_cf ( inner . db . db , options , column_family . 0 )
}
InnerReader ::Snapshot ( inner ) = > rocksdb_transactiondb_create_iterator_cf (
primary_db_handler_or_error ( & inner . db . db ) ? . db ,
options ,
column_family . 0 ,
) ,
InnerReader ::Transaction ( inner ) = > {
if let Some ( inner ) = inner . upgrade ( ) {
rocksdb_transaction_create_iterator_cf ( * inner , options , column_family . 0 )
@ -749,6 +928,10 @@ impl Reader {
) ) ;
}
}
InnerReader ::Secondary ( inner ) = > {
let db = secondary_db_handler_or_error ( & inner . db ) ? . db ;
rocksdb_create_iterator_cf ( db , options , column_family . 0 )
}
} ;
assert! ( ! iter . is_null ( ) , "rocksdb_create_iterator returned null" ) ;
if prefix . is_empty ( ) {