@ -77,9 +77,11 @@ const GOSP_CF: &str = "gosp";
const DSPO_CF : & str = "dspo" ;
const DPOS_CF : & str = "dpos" ;
const DOSP_CF : & str = "dosp" ;
const GRAPHS_CF : & str = "graphs" ;
const COLUMN_FAMILIES : [ & str ; 10 ] = [
const COLUMN_FAMILIES : [ & str ; 11 ] = [
ID2STR_CF , SPOG_CF , POSG_CF , OSPG_CF , GSPO_CF , GPOS_CF , GOSP_CF , DSPO_CF , DPOS_CF , DOSP_CF ,
GRAPHS_CF ,
] ;
const MAX_TRANSACTION_SIZE : usize = 1024 ;
@ -96,15 +98,33 @@ impl RocksDbStore {
db : Arc ::new ( DB ::open_cf ( & options , path , & COLUMN_FAMILIES ) . map_err ( map_err ) ? ) ,
} ;
let version = this . ensure_version ( ) ? ;
if version ! = LATEST_STORAGE_VERSION {
return Err ( invalid_data_error ( format! (
"The RocksDB database is still using the encoding version {}, please upgrade it" ,
version
) ) ) ;
let mut version = this . ensure_version ( ) ? ;
if version = = 0 {
// We migrate to v1
let mut transaction = this . auto_batch_writer ( ) ;
for quad in this . encoded_quads_for_pattern ( None , None , None , None ) {
let quad = quad ? ;
if ! quad . graph_name . is_default_graph ( ) {
transaction . insert_encoded_named_graph ( quad . graph_name ) ? ;
}
}
transaction . apply ( ) ? ;
version = 1 ;
this . set_version ( version ) ? ;
this . flush ( ) ? ;
}
Ok ( this )
match version {
_ if version < LATEST_STORAGE_VERSION = > Err ( invalid_data_error ( format! (
"The RocksDB database is using the outdated encoding version {}. Automated migration is not supported, please dump the store dataset using a compatible Oxigraph version and load it again using the current version" ,
version
) ) ) ,
LATEST_STORAGE_VERSION = > Ok ( this ) ,
_ = > Err ( invalid_data_error ( format! (
"The RocksDB database is using the too recent version {}. Upgrade to the latest Oxigraph version to load this database" ,
version
) ) )
}
}
fn ensure_version ( & self ) -> Result < u64 , io ::Error > {
@ -114,14 +134,25 @@ impl RocksDbStore {
buffer . copy_from_slice ( & version ) ;
u64 ::from_be_bytes ( buffer )
} else {
self . db
. put ( "oxversion" , & LATEST_STORAGE_VERSION . to_be_bytes ( ) )
. map_err ( map_err ) ? ;
self . set_version ( LATEST_STORAGE_VERSION ) ? ;
LATEST_STORAGE_VERSION
} ,
)
}
fn set_version ( & self , version : u64 ) -> Result < ( ) , io ::Error > {
self . db
. put ( "oxversion" , & version . to_be_bytes ( ) )
. map_err ( map_err )
}
fn flush ( & self ) -> Result < ( ) , io ::Error > {
let mut options = FlushOptions ::new ( ) ;
options . set_wait ( true ) ;
self . db . flush_opt ( & options ) . map_err ( map_err ) ? ;
Ok ( ( ) )
}
/// Executes a [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/).
///
/// See [`MemoryStore`](super::memory::MemoryStore::query()) for a usage example.
@ -352,15 +383,68 @@ impl RocksDbStore {
dump_dataset ( self . iter ( ) , writer , syntax )
}
/// Removes a graph from this store.
/// Returns all the store named graphs
///
/// See [`MemoryStore`](super::memory::MemoryStore::named_graphs()) for a usage example.
pub fn named_graphs ( & self ) -> impl Iterator < Item = Result < NamedOrBlankNode , io ::Error > > {
let this = self . clone ( ) ;
self . encoded_named_graphs ( )
. map ( move | g | Ok ( this . decode_named_or_blank_node ( g ? ) ? ) )
}
/// Checks if the store contains a given graph
///
/// See [`MemoryStore`](super::memory::MemoryStore::contains_named_graph()) for a usage example.
pub fn contains_named_graph < ' a > (
& self ,
graph_name : impl Into < NamedOrBlankNodeRef < ' a > > ,
) -> Result < bool , io ::Error > {
if let Some ( graph_name ) = self . get_encoded_named_or_blank_node ( graph_name . into ( ) ) ? {
self . contains_encoded_named_graph ( graph_name )
} else {
Ok ( false )
}
}
/// Inserts a graph into this store
///
/// See [`MemoryStore`](super::memory::MemoryStore::insert_named_graph()) for a usage example.
pub fn insert_named_graph < ' a > (
& self ,
graph_name : impl Into < NamedOrBlankNodeRef < ' a > > ,
) -> Result < ( ) , io ::Error > {
let mut transaction = self . auto_batch_writer ( ) ;
let graph_name = transaction . encode_named_or_blank_node ( graph_name . into ( ) ) ? ;
transaction . insert_encoded_named_graph ( graph_name ) ? ;
transaction . apply ( )
}
/// Clears a graph from this store.
///
/// See [`MemoryStore`](super::memory::MemoryStore::drop_graph()) for a usage example.
pub fn drop_graph < ' a > ( & self , graph_name : impl Into < GraphNameRef < ' a > > ) -> Result < ( ) , io ::Error > {
/// See [`MemoryStore`](super::memory::MemoryStore::clear_graph()) for a usage example.
pub fn clear_graph < ' a > (
& self ,
graph_name : impl Into < GraphNameRef < ' a > > ,
) -> Result < ( ) , io ::Error > {
if let Some ( graph_name ) = self . get_encoded_graph_name ( graph_name . into ( ) ) ? {
let mut transaction = self . auto_batch_writer ( ) ;
for quad in self . encoded_quads_for_pattern ( None , None , None , Some ( graph_name ) ) {
transaction . remove_encoded ( & quad ? ) ? ;
}
transaction . clear_encoded_graph ( graph_name ) ? ;
transaction . apply ( )
} else {
Ok ( ( ) )
}
}
/// Removes a graph from this store.
///
/// See [`MemoryStore`](super::memory::MemoryStore::remove_named_graph()) for a usage example.
pub fn remove_named_graph < ' a > (
& self ,
graph_name : impl Into < NamedOrBlankNodeRef < ' a > > ,
) -> Result < ( ) , io ::Error > {
if let Some ( graph_name ) = self . get_encoded_named_or_blank_node ( graph_name . into ( ) ) ? {
let mut transaction = self . auto_batch_writer ( ) ;
transaction . remove_encoded_named_graph ( graph_name ) ? ;
transaction . apply ( )
} else {
Ok ( ( ) )
@ -371,17 +455,9 @@ impl RocksDbStore {
///
/// See [`MemoryStore`](super::memory::MemoryStore::clear()) for a usage example.
pub fn clear ( & self ) -> Result < ( ) , io ::Error > {
self . clear_cf ( self . id2str_cf ( ) ) ? ;
self . clear_cf ( self . spog_cf ( ) ) ? ;
self . clear_cf ( self . posg_cf ( ) ) ? ;
self . clear_cf ( self . ospg_cf ( ) ) ? ;
self . clear_cf ( self . gspo_cf ( ) ) ? ;
self . clear_cf ( self . gpos_cf ( ) ) ? ;
self . clear_cf ( self . gosp_cf ( ) ) ? ;
self . clear_cf ( self . dspo_cf ( ) ) ? ;
self . clear_cf ( self . dpos_cf ( ) ) ? ;
self . clear_cf ( self . dosp_cf ( ) ) ? ;
Ok ( ( ) )
let mut transaction = self . auto_batch_writer ( ) ;
transaction . clear ( ) ? ;
transaction . apply ( )
}
fn id2str_cf ( & self ) -> & ColumnFamily {
@ -424,6 +500,9 @@ impl RocksDbStore {
get_cf ( & self . db , DOSP_CF )
}
fn graphs_cf ( & self ) -> & ColumnFamily {
get_cf ( & self . db , GRAPHS_CF )
}
fn auto_batch_writer ( & self ) -> AutoBatchWriter < ' _ > {
AutoBatchWriter {
store : self ,
@ -657,48 +736,25 @@ impl RocksDbStore {
self . inner_quads ( self . dosp_cf ( ) , prefix , QuadEncoding ::DOSP )
}
#[ allow(unsafe_code) ]
fn inner_quads (
& self ,
cf : & ColumnFamily ,
prefix : Vec < u8 > ,
encoding : QuadEncoding ,
) -> DecodingIndexIterator {
let mut iter = self . db . raw _iterator_cf ( cf ) ;
iter . seek ( & prefix ) ;
let mut iter = self . db_iter ( cf ) ;
iter . iter . seek ( & prefix ) ;
DecodingIndexIterator {
iter : unsafe { StaticDBRowIterator ::new ( iter , self . db . clone ( ) ) } , // This is safe because the iterator belongs to DB
iter ,
prefix ,
encoding ,
}
}
fn clear_cf ( & self , cf : & ColumnFamily ) -> Result < ( ) , io ::Error > {
self . db
. delete_range_cf (
cf ,
[
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
] ,
[
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
] ,
)
. map_err ( map_err )
#[ allow(unsafe_code) ]
fn db_iter ( & self , cf : & ColumnFamily ) -> StaticDBRowIterator {
// Valid because it's the same database so db can't be dropped before iter
unsafe { StaticDBRowIterator ::new ( self . db . raw_iterator_cf ( cf ) , self . db . clone ( ) ) }
}
}
@ -745,6 +801,7 @@ impl StrLookup for RocksDbStore {
impl ReadableEncodedStore for RocksDbStore {
type QuadsIter = DecodingIndexesIterator ;
type GraphsIter = DecodingGraphIterator ;
fn encoded_quads_for_pattern (
& self ,
@ -809,6 +866,20 @@ impl ReadableEncodedStore for RocksDbStore {
} ,
}
}
fn encoded_named_graphs ( & self ) -> DecodingGraphIterator {
let mut iter = self . db_iter ( self . graphs_cf ( ) ) ;
iter . iter . seek_to_first ( ) ;
DecodingGraphIterator { iter }
}
fn contains_encoded_named_graph ( & self , graph_name : EncodedTerm ) -> Result < bool , io ::Error > {
Ok ( self
. db
. get_cf ( self . graphs_cf ( ) , & encode_term ( graph_name ) )
. map_err ( map_err ) ?
. is_some ( ) )
}
}
struct AutoBatchWriter < ' a > {
@ -831,6 +902,32 @@ impl AutoBatchWriter<'_> {
}
Ok ( ( ) )
}
fn clear_cf ( & mut self , cf : & ColumnFamily ) {
self . batch . delete_range_cf (
cf ,
[
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
u8 ::MIN ,
] ,
[
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
u8 ::MAX ,
] ,
)
}
}
impl StrEncodingAware for AutoBatchWriter < ' _ > {
@ -885,6 +982,10 @@ impl WritableEncodedStore for AutoBatchWriter<'_> {
write_gosp_quad ( & mut self . buffer , quad ) ;
self . batch . put_cf ( self . store . gosp_cf ( ) , & self . buffer , & [ ] ) ;
self . buffer . clear ( ) ;
write_term ( & mut self . buffer , quad . graph_name ) ;
self . batch . put_cf ( self . store . graphs_cf ( ) , & self . buffer , & [ ] ) ;
self . buffer . clear ( ) ;
}
self . apply_if_big ( )
@ -931,6 +1032,49 @@ impl WritableEncodedStore for AutoBatchWriter<'_> {
self . apply_if_big ( )
}
fn insert_encoded_named_graph ( & mut self , graph_name : EncodedTerm ) -> Result < ( ) , io ::Error > {
self . batch
. put_cf ( self . store . graphs_cf ( ) , & encode_term ( graph_name ) , & [ ] ) ;
self . apply_if_big ( )
}
fn clear_encoded_graph ( & mut self , graph_name : EncodedTerm ) -> Result < ( ) , io ::Error > {
if graph_name . is_default_graph ( ) {
self . clear_cf ( self . store . dspo_cf ( ) ) ;
self . clear_cf ( self . store . dpos_cf ( ) ) ;
self . clear_cf ( self . store . dosp_cf ( ) ) ;
} else {
for quad in self . store . quads_for_graph ( graph_name ) {
self . remove_encoded ( & quad ? ) ? ;
}
}
self . apply_if_big ( )
}
fn remove_encoded_named_graph ( & mut self , graph_name : EncodedTerm ) -> Result < ( ) , io ::Error > {
for quad in self . store . quads_for_graph ( graph_name ) {
self . remove_encoded ( & quad ? ) ? ;
}
self . batch
. delete_cf ( self . store . graphs_cf ( ) , & encode_term ( graph_name ) ) ;
self . apply_if_big ( )
}
fn clear ( & mut self ) -> Result < ( ) , io ::Error > {
self . clear_cf ( self . store . spog_cf ( ) ) ;
self . clear_cf ( self . store . posg_cf ( ) ) ;
self . clear_cf ( self . store . ospg_cf ( ) ) ;
self . clear_cf ( self . store . gspo_cf ( ) ) ;
self . clear_cf ( self . store . gpos_cf ( ) ) ;
self . clear_cf ( self . store . gosp_cf ( ) ) ;
self . clear_cf ( self . store . dspo_cf ( ) ) ;
self . clear_cf ( self . store . dpos_cf ( ) ) ;
self . clear_cf ( self . store . dosp_cf ( ) ) ;
self . clear_cf ( self . store . graphs_cf ( ) ) ;
self . clear_cf ( self . store . id2str_cf ( ) ) ;
self . apply_if_big ( )
}
}
/// Allows inserting and deleting quads during an ACID transaction with the [`RocksDbStore`].
@ -1093,6 +1237,10 @@ impl WritableEncodedStore for RocksDbTransaction<'_> {
write_gosp_quad ( & mut self . buffer , quad ) ;
self . batch . put_cf ( self . store . gosp_cf ( ) , & self . buffer , & [ ] ) ;
self . buffer . clear ( ) ;
write_term ( & mut self . buffer , quad . graph_name ) ;
self . batch . put_cf ( self . store . graphs_cf ( ) , & self . buffer , & [ ] ) ;
self . buffer . clear ( ) ;
}
Ok ( ( ) )
@ -1139,6 +1287,33 @@ impl WritableEncodedStore for RocksDbTransaction<'_> {
Ok ( ( ) )
}
fn insert_encoded_named_graph ( & mut self , graph_name : EncodedTerm ) -> Result < ( ) , io ::Error > {
self . batch
. put_cf ( self . store . graphs_cf ( ) , & encode_term ( graph_name ) , & [ ] ) ;
Ok ( ( ) )
}
fn clear_encoded_graph ( & mut self , _ : EncodedTerm ) -> Result < ( ) , io ::Error > {
Err ( io ::Error ::new (
io ::ErrorKind ::Other ,
"CLEAR is not implemented in RocksDB transactions" ,
) )
}
fn remove_encoded_named_graph ( & mut self , _ : EncodedTerm ) -> Result < ( ) , io ::Error > {
Err ( io ::Error ::new (
io ::ErrorKind ::Other ,
"DROP is not implemented in RocksDB transactions" ,
) )
}
fn clear ( & mut self ) -> Result < ( ) , Self ::Error > {
Err ( io ::Error ::new (
io ::ErrorKind ::Other ,
"CLEAR ALL is not implemented in RocksDB transactions" ,
) )
}
}
#[ allow(clippy::expect_used) ]
@ -1267,6 +1442,23 @@ impl Iterator for RocksDbQuadIter {
}
}
pub ( crate ) struct DecodingGraphIterator {
iter : StaticDBRowIterator ,
}
impl Iterator for DecodingGraphIterator {
type Item = Result < EncodedTerm , io ::Error > ;
fn next ( & mut self ) -> Option < Result < EncodedTerm , io ::Error > > {
if let Some ( key ) = self . iter . key ( ) {
let result = decode_term ( key ) ;
self . iter . next ( ) ;
Some ( result )
} else {
None
}
}
}
#[ test ]
fn store ( ) -> Result < ( ) , io ::Error > {
use crate ::model ::* ;