@ -1,9 +1,9 @@
#![ allow(clippy::same_name_method) ]
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
use crate ::model ::Quad ;
use crate ::model ::{ GraphNameRef , NamedOrBlankNodeRef , QuadRef , TermRef } ;
use crate ::storage ::backend ::{ Reader , Transaction } ;
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
use crate ::storage ::binary_encoder ::LATEST_STORAGE_VERSION ;
use crate ::storage ::binary_encoder ::{
decode_term , encode_term , encode_term_pair , encode_term_quad , encode_term_triple ,
@ -12,22 +12,22 @@ use crate::storage::binary_encoder::{
WRITTEN_TERM_MAX_SIZE ,
} ;
pub use crate ::storage ::error ::{ CorruptionError , LoaderError , SerializerError , StorageError } ;
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
use crate ::storage ::numeric_encoder ::Decoder ;
use crate ::storage ::numeric_encoder ::{ insert_term , EncodedQuad , EncodedTerm , StrHash , StrLookup } ;
use backend ::{ ColumnFamily , ColumnFamilyDefinition , Db , Iter } ;
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
use std ::collections ::VecDeque ;
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
use std ::collections ::{ HashMap , HashSet } ;
use std ::error ::Error ;
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
use std ::mem ::{ swap , take } ;
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
use std ::path ::{ Path , PathBuf } ;
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
use std ::sync ::Mutex ;
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
use std ::{ io , thread } ;
mod backend ;
@ -47,16 +47,16 @@ const DSPO_CF: &str = "dspo";
const DPOS_CF : & str = "dpos" ;
const DOSP_CF : & str = "dosp" ;
const GRAPHS_CF : & str = "graphs" ;
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
const DEFAULT_CF : & str = "default" ;
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
const DEFAULT_BULK_LOAD_BATCH_SIZE : usize = 1_000_000 ;
/// Low level storage primitives
#[ derive(Clone) ]
pub struct Storage {
db : Db ,
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
default_cf : ColumnFamily ,
id2str_cf : ColumnFamily ,
spog_cf : ColumnFamily ,
@ -76,12 +76,12 @@ impl Storage {
Self ::setup ( Db ::new ( Self ::column_families ( ) ) ? )
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
pub fn open ( path : & Path ) -> Result < Self , StorageError > {
Self ::setup ( Db ::open_read_write ( Some ( path ) , Self ::column_families ( ) ) ? )
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
pub fn open_secondary ( primary_path : & Path ) -> Result < Self , StorageError > {
Self ::setup ( Db ::open_secondary (
primary_path ,
@ -90,7 +90,7 @@ impl Storage {
) ? )
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
pub fn open_persistent_secondary (
primary_path : & Path ,
secondary_path : & Path ,
@ -102,7 +102,7 @@ impl Storage {
) ? )
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
pub fn open_read_only ( path : & Path ) -> Result < Self , StorageError > {
Self ::setup ( Db ::open_read_only ( path , Self ::column_families ( ) ) ? )
}
@ -180,7 +180,7 @@ impl Storage {
fn setup ( db : Db ) -> Result < Self , StorageError > {
let this = Self {
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
default_cf : db . column_family ( DEFAULT_CF ) ? ,
id2str_cf : db . column_family ( ID2STR_CF ) ? ,
spog_cf : db . column_family ( SPOG_CF ) ? ,
@ -195,12 +195,12 @@ impl Storage {
graphs_cf : db . column_family ( GRAPHS_CF ) ? ,
db ,
} ;
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
this . migrate ( ) ? ;
Ok ( this )
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
fn migrate ( & self ) -> Result < ( ) , StorageError > {
let mut version = self . ensure_version ( ) ? ;
if version = = 0 {
@ -240,7 +240,7 @@ impl Storage {
}
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
fn ensure_version ( & self ) -> Result < u64 , StorageError > {
Ok (
if let Some ( version ) = self . db . get ( & self . default_cf , b" oxversion " ) ? {
@ -254,7 +254,7 @@ impl Storage {
)
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
fn update_version ( & self , version : u64 ) -> Result < ( ) , StorageError > {
self . db
. insert ( & self . default_cf , b" oxversion " , & version . to_be_bytes ( ) ) ? ;
@ -281,12 +281,12 @@ impl Storage {
} )
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
pub fn flush ( & self ) -> Result < ( ) , StorageError > {
self . db . flush ( )
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
pub fn compact ( & self ) -> Result < ( ) , StorageError > {
self . db . compact ( & self . default_cf ) ? ;
self . db . compact ( & self . gspo_cf ) ? ;
@ -301,7 +301,7 @@ impl Storage {
self . db . compact ( & self . id2str_cf )
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
pub fn backup ( & self , target_directory : & Path ) -> Result < ( ) , StorageError > {
self . db . backup ( target_directory )
}
@ -626,7 +626,7 @@ impl StorageReader {
}
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
pub fn get_str ( & self , key : & StrHash ) -> Result < Option < String > , StorageError > {
Ok ( self
. storage
@ -637,7 +637,7 @@ impl StorageReader {
. map_err ( CorruptionError ::new ) ? )
}
#[ cfg(target_family = " wasm " ) ]
#[ cfg(any( target_family = " wasm " , not(feature = " rocksdb " )) )]
pub fn get_str ( & self , key : & StrHash ) -> Result < Option < String > , StorageError > {
Ok ( self
. reader
@ -647,21 +647,21 @@ impl StorageReader {
. map_err ( CorruptionError ::new ) ? )
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
pub fn contains_str ( & self , key : & StrHash ) -> Result < bool , StorageError > {
self . storage
. db
. contains_key ( & self . storage . id2str_cf , & key . to_be_bytes ( ) )
}
#[ cfg(target_family = " wasm " ) ]
#[ cfg(any( target_family = " wasm " , not(feature = " rocksdb " )) )]
pub fn contains_str ( & self , key : & StrHash ) -> Result < bool , StorageError > {
self . reader
. contains_key ( & self . storage . id2str_cf , & key . to_be_bytes ( ) )
}
/// Validates that all the storage invariants held in the data
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
pub fn validate ( & self ) -> Result < ( ) , StorageError > {
// triples
let dspo_size = self . dspo_quads ( & [ ] ) . count ( ) ;
@ -773,7 +773,7 @@ impl StorageReader {
}
/// Validates that all the storage invariants held in the data
#[ cfg(target_family = " wasm " ) ]
#[ cfg(any( target_family = " wasm " , not(feature = " rocksdb " )) )]
#[ allow(clippy::unused_self, clippy::unnecessary_wraps) ]
pub fn validate ( & self ) -> Result < ( ) , StorageError > {
Ok ( ( ) ) // TODO
@ -997,7 +997,7 @@ impl<'a> StorageWriter<'a> {
}
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
fn insert_str ( & mut self , key : & StrHash , value : & str ) -> Result < ( ) , StorageError > {
if self
. storage
@ -1013,7 +1013,7 @@ impl<'a> StorageWriter<'a> {
)
}
#[ cfg(target_family = " wasm " ) ]
#[ cfg(any( target_family = " wasm " , not(feature = " rocksdb " )) )]
fn insert_str ( & mut self , key : & StrHash , value : & str ) -> Result < ( ) , StorageError > {
self . transaction . insert (
& self . storage . id2str_cf ,
@ -1178,7 +1178,7 @@ impl<'a> StorageWriter<'a> {
}
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
#[ must_use ]
pub struct StorageBulkLoader {
storage : Storage ,
@ -1187,7 +1187,7 @@ pub struct StorageBulkLoader {
max_memory_size : Option < usize > ,
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
impl StorageBulkLoader {
pub fn new ( storage : Storage ) -> Self {
Self {
@ -1318,7 +1318,7 @@ impl StorageBulkLoader {
}
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
struct FileBulkLoader < ' a > {
storage : & ' a Storage ,
id2str : HashMap < StrHash , Box < str > > ,
@ -1327,7 +1327,7 @@ struct FileBulkLoader<'a> {
graphs : HashSet < EncodedTerm > ,
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
impl < ' a > FileBulkLoader < ' a > {
fn new ( storage : & ' a Storage , batch_size : usize ) -> Self {
Self {
@ -1533,7 +1533,7 @@ impl<'a> FileBulkLoader<'a> {
}
}
#[ cfg(not(target_family = " wasm " )) ]
#[ cfg(all( not(target_family = " wasm " ), feature = " rocksdb " )) ]
fn map_thread_result < R > ( result : thread ::Result < R > ) -> io ::Result < R > {
result . map_err ( | e | {
io ::Error ::new (