Compare commits

...

2 Commits

  1. 6
      Cargo.lock
  2. 1
      Cargo.toml
  3. 2
      lib/oxigraph/Cargo.toml
  4. 8
      lib/oxigraph/src/storage/backend/mod.rs
  5. 164
      lib/oxigraph/src/storage/backend/oxi_rocksdb.rs
  6. 2
      lib/oxigraph/src/storage/binary_encoder.rs
  7. 116
      lib/oxigraph/src/storage/mod.rs
  8. 74
      lib/oxigraph/src/store.rs
  9. 240
      lib/oxigraph/tests/store.rs

6
Cargo.lock generated

@ -629,13 +629,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"windows-targets 0.48.5", "windows-targets 0.52.4",
] ]
[[package]] [[package]]
name = "librocksdb-sys" name = "librocksdb-sys"
version = "0.11.0+8.3.2" version = "0.11.0+8.3.2"
source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#cedbf494b4ec11638f1e0b7446731e0b73573352" source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#9a1595d5cbf1d1d8a9b94346543a8ddae2bee0be"
dependencies = [ dependencies = [
"bindgen", "bindgen",
"bzip2-sys", "bzip2-sys",
@ -1118,7 +1118,7 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]] [[package]]
name = "rocksdb" name = "rocksdb"
version = "0.21.0" version = "0.21.0"
source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#cedbf494b4ec11638f1e0b7446731e0b73573352" source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#9a1595d5cbf1d1d8a9b94346543a8ddae2bee0be"
dependencies = [ dependencies = [
"libc", "libc",
"librocksdb-sys", "librocksdb-sys",

@ -23,6 +23,7 @@ edition = "2021"
rust-version = "1.70" rust-version = "1.70"
[workspace.dependencies] [workspace.dependencies]
rocksdb = {git = "https://git.nextgraph.org/NextGraph/rust-rocksdb.git", branch = "master", features = [ ] }
anyhow = "1.0.72" anyhow = "1.0.72"
arbitrary = "1.3" arbitrary = "1.3"
assert_cmd = "2.0" assert_cmd = "2.0"

@ -41,7 +41,7 @@ thiserror.workspace = true
[target.'cfg(not(target_family = "wasm"))'.dependencies] [target.'cfg(not(target_family = "wasm"))'.dependencies]
libc = "0.2" libc = "0.2"
rocksdb = {git = "https://git.nextgraph.org/NextGraph/rust-rocksdb.git", branch = "master", features = [ ], optional = true } rocksdb.workspace = true
[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies] [target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies]
getrandom.workspace = true getrandom.workspace = true

@ -1,12 +1,12 @@
//! A storage backend //! A storage backend
//! RocksDB is available, if not in memory //! RocksDB is available, if not in memory
#[cfg(any(target_family = "wasm", not(feature = "rocksdb")))] #[cfg(any(target_family = "wasm"))]
pub use fallback::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, Transaction}; pub use fallback::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, Transaction};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub use oxi_rocksdb::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, Transaction}; pub use oxi_rocksdb::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, Transaction};
#[cfg(any(target_family = "wasm", not(feature = "rocksdb")))] #[cfg(any(target_family = "wasm"))]
mod fallback; mod fallback;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
mod oxi_rocksdb; mod oxi_rocksdb;

@ -28,6 +28,13 @@ use std::sync::{Arc, OnceLock};
use std::thread::{available_parallelism, yield_now}; use std::thread::{available_parallelism, yield_now};
use std::{fmt, io, ptr, slice}; use std::{fmt, io, ptr, slice};
pub fn opt_bytes_to_ptr<T: AsRef<[u8]>>(opt: Option<T>) -> *const c_char {
match opt {
Some(v) => v.as_ref().as_ptr() as *const c_char,
None => ptr::null(),
}
}
macro_rules! ffi_result { macro_rules! ffi_result {
( $($function:ident)::*( $arg1:expr $(, $arg:expr)* $(,)? ) ) => {{ ( $($function:ident)::*( $arg1:expr $(, $arg:expr)* $(,)? ) ) => {{
let mut status = rocksdb_status_t { let mut status = rocksdb_status_t {
@ -148,13 +155,17 @@ impl Drop for RoDbHandler {
} }
impl Db { impl Db {
pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self, StorageError> { pub fn new(
Self::open_read_write(None, column_families) column_families: Vec<ColumnFamilyDefinition>,
key: Option<[u8; 32]>,
) -> Result<Self, StorageError> {
Self::open_read_write(None, column_families, key)
} }
pub fn open_read_write( pub fn open_read_write(
path: Option<&Path>, path: Option<&Path>,
column_families: Vec<ColumnFamilyDefinition>, column_families: Vec<ColumnFamilyDefinition>,
key: Option<[u8; 32]>,
) -> Result<Self, StorageError> { ) -> Result<Self, StorageError> {
let (path, in_memory) = if let Some(path) = path { let (path, in_memory) = if let Some(path) = path {
(path.to_path_buf(), false) (path.to_path_buf(), false)
@ -163,7 +174,7 @@ impl Db {
}; };
let c_path = path_to_cstring(&path)?; let c_path = path_to_cstring(&path)?;
unsafe { unsafe {
let options = Self::db_options(true, in_memory)?; let options = Self::db_options(true, in_memory, key)?;
rocksdb_options_set_create_if_missing(options, 1); rocksdb_options_set_create_if_missing(options, 1);
rocksdb_options_set_create_missing_column_families(options, 1); rocksdb_options_set_create_missing_column_families(options, 1);
rocksdb_options_set_compression( rocksdb_options_set_compression(
@ -305,82 +316,83 @@ impl Db {
} }
} }
pub fn open_secondary( // pub fn open_secondary(
primary_path: &Path, // primary_path: &Path,
secondary_path: Option<&Path>, // secondary_path: Option<&Path>,
column_families: Vec<ColumnFamilyDefinition>, // column_families: Vec<ColumnFamilyDefinition>,
) -> Result<Self, StorageError> { // ) -> Result<Self, StorageError> {
let c_primary_path = path_to_cstring(primary_path)?; // let c_primary_path = path_to_cstring(primary_path)?;
let (secondary_path, in_memory) = if let Some(path) = secondary_path { // let (secondary_path, in_memory) = if let Some(path) = secondary_path {
(path.to_path_buf(), false) // (path.to_path_buf(), false)
} else { // } else {
(tmp_path(), true) // (tmp_path(), true)
}; // };
let c_secondary_path = path_to_cstring(&secondary_path)?; // let c_secondary_path = path_to_cstring(&secondary_path)?;
unsafe { // unsafe {
let options = Self::db_options(false, false)?; // let options = Self::db_options(false, false)?;
let (column_family_names, c_column_family_names, cf_options) = // let (column_family_names, c_column_family_names, cf_options) =
Self::column_families_names_and_options(column_families, options); // Self::column_families_names_and_options(column_families, options);
let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = // let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> =
vec![ptr::null_mut(); column_family_names.len()]; // vec![ptr::null_mut(); column_family_names.len()];
let c_num_column_families = c_column_family_names.len().try_into().unwrap(); // let c_num_column_families = c_column_family_names.len().try_into().unwrap();
let db = ffi_result!(rocksdb_open_as_secondary_column_families_with_status( // let db = ffi_result!(rocksdb_open_as_secondary_column_families_with_status(
options, // options,
c_primary_path.as_ptr(), // c_primary_path.as_ptr(),
c_secondary_path.as_ptr(), // c_secondary_path.as_ptr(),
c_num_column_families, // c_num_column_families,
c_column_family_names // c_column_family_names
.iter() // .iter()
.map(|cf| cf.as_ptr()) // .map(|cf| cf.as_ptr())
.collect::<Vec<_>>() // .collect::<Vec<_>>()
.as_ptr(), // .as_ptr(),
cf_options.as_ptr().cast(), // cf_options.as_ptr().cast(),
cf_handles.as_mut_ptr(), // cf_handles.as_mut_ptr(),
)) // ))
.map_err(|e| { // .map_err(|e| {
for cf_option in &cf_options { // for cf_option in &cf_options {
rocksdb_options_destroy(*cf_option); // rocksdb_options_destroy(*cf_option);
} // }
rocksdb_options_destroy(options); // rocksdb_options_destroy(options);
e // e
})?; // })?;
assert!( // assert!(
!db.is_null(), // !db.is_null(),
"rocksdb_open_for_read_only_column_families_with_status returned null" // "rocksdb_open_for_read_only_column_families_with_status returned null"
); // );
for handle in &cf_handles { // for handle in &cf_handles {
assert!( // assert!(
!handle.is_null(), // !handle.is_null(),
"rocksdb_open_for_read_only_column_families_with_status returned a null column family" // "rocksdb_open_for_read_only_column_families_with_status returned a null column family"
); // );
} // }
let read_options = rocksdb_readoptions_create(); // let read_options = rocksdb_readoptions_create();
assert!( // assert!(
!read_options.is_null(), // !read_options.is_null(),
"rocksdb_readoptions_create returned null" // "rocksdb_readoptions_create returned null"
); // );
Ok(Self { // Ok(Self {
inner: DbKind::ReadOnly(Arc::new(RoDbHandler { // inner: DbKind::ReadOnly(Arc::new(RoDbHandler {
db, // db,
options, // options,
read_options, // read_options,
column_family_names, // column_family_names,
cf_handles, // cf_handles,
cf_options, // cf_options,
is_secondary: true, // is_secondary: true,
path_to_remove: in_memory.then_some(secondary_path), // path_to_remove: in_memory.then_some(secondary_path),
})), // })),
}) // })
} // }
} // }
pub fn open_read_only( pub fn open_read_only(
path: &Path, path: &Path,
column_families: Vec<ColumnFamilyDefinition>, column_families: Vec<ColumnFamilyDefinition>,
key: Option<[u8; 32]>,
) -> Result<Self, StorageError> { ) -> Result<Self, StorageError> {
unsafe { unsafe {
let c_path = path_to_cstring(path)?; let c_path = path_to_cstring(path)?;
let options = Self::db_options(true, false)?; let options = Self::db_options(true, false, key)?;
let (column_family_names, c_column_family_names, cf_options) = let (column_family_names, c_column_family_names, cf_options) =
Self::column_families_names_and_options(column_families, options); Self::column_families_names_and_options(column_families, options);
let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> =
@ -440,6 +452,7 @@ impl Db {
fn db_options( fn db_options(
limit_max_open_files: bool, limit_max_open_files: bool,
in_memory: bool, in_memory: bool,
key: Option<[u8; 32]>,
) -> Result<*mut rocksdb_options_t, StorageError> { ) -> Result<*mut rocksdb_options_t, StorageError> {
static ROCKSDB_ENV: OnceLock<UnsafeEnv> = OnceLock::new(); static ROCKSDB_ENV: OnceLock<UnsafeEnv> = OnceLock::new();
static ROCKSDB_MEM_ENV: OnceLock<UnsafeEnv> = OnceLock::new(); static ROCKSDB_MEM_ENV: OnceLock<UnsafeEnv> = OnceLock::new();
@ -487,8 +500,11 @@ impl Db {
}) })
} else { } else {
ROCKSDB_ENV.get_or_init(|| { ROCKSDB_ENV.get_or_init(|| {
let env = rocksdb_create_default_env(); let env = match key {
assert!(!env.is_null(), "rocksdb_create_default_env returned null"); Some(_) => rocksdb_create_encrypted_env(opt_bytes_to_ptr(key.as_ref())),
None => rocksdb_create_default_env(),
};
assert!(!env.is_null(), "rocksdb_create_encrypted_env returned null");
UnsafeEnv(env) UnsafeEnv(env)
}) })
} }

@ -5,7 +5,7 @@ use oxsdatatypes::*;
use std::io::Read; use std::io::Read;
use std::mem::size_of; use std::mem::size_of;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub const LATEST_STORAGE_VERSION: u64 = 1; pub const LATEST_STORAGE_VERSION: u64 = 1;
pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<StrHash>(); pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<StrHash>();

@ -1,9 +1,9 @@
#![allow(clippy::same_name_method)] #![allow(clippy::same_name_method)]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use crate::model::Quad; use crate::model::Quad;
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef, TermRef}; use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef, TermRef};
use crate::storage::backend::{Reader, Transaction}; use crate::storage::backend::{Reader, Transaction};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use crate::storage::binary_encoder::LATEST_STORAGE_VERSION; use crate::storage::binary_encoder::LATEST_STORAGE_VERSION;
use crate::storage::binary_encoder::{ use crate::storage::binary_encoder::{
decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple, decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple,
@ -12,22 +12,22 @@ use crate::storage::binary_encoder::{
WRITTEN_TERM_MAX_SIZE, WRITTEN_TERM_MAX_SIZE,
}; };
pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, StorageError}; pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, StorageError};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use crate::storage::numeric_encoder::Decoder; use crate::storage::numeric_encoder::Decoder;
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup}; use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::collections::VecDeque; use std::collections::VecDeque;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::error::Error; use std::error::Error;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::mem::{swap, take}; use std::mem::{swap, take};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::sync::Mutex; use std::sync::Mutex;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::{io, thread}; use std::{io, thread};
mod backend; mod backend;
@ -47,16 +47,16 @@ const DSPO_CF: &str = "dspo";
const DPOS_CF: &str = "dpos"; const DPOS_CF: &str = "dpos";
const DOSP_CF: &str = "dosp"; const DOSP_CF: &str = "dosp";
const GRAPHS_CF: &str = "graphs"; const GRAPHS_CF: &str = "graphs";
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
const DEFAULT_CF: &str = "default"; const DEFAULT_CF: &str = "default";
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000; const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000;
/// Low level storage primitives /// Low level storage primitives
#[derive(Clone)] #[derive(Clone)]
pub struct Storage { pub struct Storage {
db: Db, db: Db,
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
default_cf: ColumnFamily, default_cf: ColumnFamily,
id2str_cf: ColumnFamily, id2str_cf: ColumnFamily,
spog_cf: ColumnFamily, spog_cf: ColumnFamily,
@ -72,39 +72,43 @@ pub struct Storage {
} }
impl Storage { impl Storage {
pub fn new() -> Result<Self, StorageError> { pub fn new(key: Option<[u8; 32]>) -> Result<Self, StorageError> {
Self::setup(Db::new(Self::column_families())?) Self::setup(Db::new(Self::column_families(), key)?)
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn open(path: &Path) -> Result<Self, StorageError> { pub fn open(path: &Path, key: Option<[u8; 32]>) -> Result<Self, StorageError> {
Self::setup(Db::open_read_write(Some(path), Self::column_families())?) Self::setup(Db::open_read_write(
} Some(path),
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open_secondary(primary_path: &Path) -> Result<Self, StorageError> {
Self::setup(Db::open_secondary(
primary_path,
None,
Self::column_families(),
)?)
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open_persistent_secondary(
primary_path: &Path,
secondary_path: &Path,
) -> Result<Self, StorageError> {
Self::setup(Db::open_secondary(
primary_path,
Some(secondary_path),
Self::column_families(), Self::column_families(),
key,
)?) )?)
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] // #[cfg(all(not(target_family = "wasm")))]
pub fn open_read_only(path: &Path) -> Result<Self, StorageError> { // pub fn open_secondary(primary_path: &Path) -> Result<Self, StorageError> {
Self::setup(Db::open_read_only(path, Self::column_families())?) // Self::setup(Db::open_secondary(
// primary_path,
// None,
// Self::column_families(),
// )?)
// }
// #[cfg(all(not(target_family = "wasm")))]
// pub fn open_persistent_secondary(
// primary_path: &Path,
// secondary_path: &Path,
// ) -> Result<Self, StorageError> {
// Self::setup(Db::open_secondary(
// primary_path,
// Some(secondary_path),
// Self::column_families(),
// )?)
// }
#[cfg(all(not(target_family = "wasm")))]
pub fn open_read_only(path: &Path, key: Option<[u8; 32]>) -> Result<Self, StorageError> {
Self::setup(Db::open_read_only(path, Self::column_families(), key)?)
} }
fn column_families() -> Vec<ColumnFamilyDefinition> { fn column_families() -> Vec<ColumnFamilyDefinition> {
@ -180,7 +184,7 @@ impl Storage {
fn setup(db: Db) -> Result<Self, StorageError> { fn setup(db: Db) -> Result<Self, StorageError> {
let this = Self { let this = Self {
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
default_cf: db.column_family(DEFAULT_CF)?, default_cf: db.column_family(DEFAULT_CF)?,
id2str_cf: db.column_family(ID2STR_CF)?, id2str_cf: db.column_family(ID2STR_CF)?,
spog_cf: db.column_family(SPOG_CF)?, spog_cf: db.column_family(SPOG_CF)?,
@ -195,12 +199,12 @@ impl Storage {
graphs_cf: db.column_family(GRAPHS_CF)?, graphs_cf: db.column_family(GRAPHS_CF)?,
db, db,
}; };
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
this.migrate()?; this.migrate()?;
Ok(this) Ok(this)
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn migrate(&self) -> Result<(), StorageError> { fn migrate(&self) -> Result<(), StorageError> {
let mut version = self.ensure_version()?; let mut version = self.ensure_version()?;
if version == 0 { if version == 0 {
@ -240,7 +244,7 @@ impl Storage {
} }
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn ensure_version(&self) -> Result<u64, StorageError> { fn ensure_version(&self) -> Result<u64, StorageError> {
Ok( Ok(
if let Some(version) = self.db.get(&self.default_cf, b"oxversion")? { if let Some(version) = self.db.get(&self.default_cf, b"oxversion")? {
@ -254,7 +258,7 @@ impl Storage {
) )
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn update_version(&self, version: u64) -> Result<(), StorageError> { fn update_version(&self, version: u64) -> Result<(), StorageError> {
self.db self.db
.insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?; .insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?;
@ -281,12 +285,12 @@ impl Storage {
}) })
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn flush(&self) -> Result<(), StorageError> { pub fn flush(&self) -> Result<(), StorageError> {
self.db.flush() self.db.flush()
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn compact(&self) -> Result<(), StorageError> { pub fn compact(&self) -> Result<(), StorageError> {
self.db.compact(&self.default_cf)?; self.db.compact(&self.default_cf)?;
self.db.compact(&self.gspo_cf)?; self.db.compact(&self.gspo_cf)?;
@ -301,7 +305,7 @@ impl Storage {
self.db.compact(&self.id2str_cf) self.db.compact(&self.id2str_cf)
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> { pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> {
self.db.backup(target_directory) self.db.backup(target_directory)
} }
@ -626,7 +630,7 @@ impl StorageReader {
} }
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> { pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
Ok(self Ok(self
.storage .storage
@ -647,7 +651,7 @@ impl StorageReader {
.map_err(CorruptionError::new)?) .map_err(CorruptionError::new)?)
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> { pub fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
self.storage self.storage
.db .db
@ -661,7 +665,7 @@ impl StorageReader {
} }
/// Validates that all the storage invariants held in the data /// Validates that all the storage invariants held in the data
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn validate(&self) -> Result<(), StorageError> { pub fn validate(&self) -> Result<(), StorageError> {
// triples // triples
let dspo_size = self.dspo_quads(&[]).count(); let dspo_size = self.dspo_quads(&[]).count();
@ -997,7 +1001,7 @@ impl<'a> StorageWriter<'a> {
} }
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> { fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> {
if self if self
.storage .storage
@ -1178,7 +1182,7 @@ impl<'a> StorageWriter<'a> {
} }
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
#[must_use] #[must_use]
pub struct StorageBulkLoader { pub struct StorageBulkLoader {
storage: Storage, storage: Storage,
@ -1187,7 +1191,7 @@ pub struct StorageBulkLoader {
max_memory_size: Option<usize>, max_memory_size: Option<usize>,
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
impl StorageBulkLoader { impl StorageBulkLoader {
pub fn new(storage: Storage) -> Self { pub fn new(storage: Storage) -> Self {
Self { Self {
@ -1318,7 +1322,7 @@ impl StorageBulkLoader {
} }
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
struct FileBulkLoader<'a> { struct FileBulkLoader<'a> {
storage: &'a Storage, storage: &'a Storage,
id2str: HashMap<StrHash, Box<str>>, id2str: HashMap<StrHash, Box<str>>,
@ -1327,7 +1331,7 @@ struct FileBulkLoader<'a> {
graphs: HashSet<EncodedTerm>, graphs: HashSet<EncodedTerm>,
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
impl<'a> FileBulkLoader<'a> { impl<'a> FileBulkLoader<'a> {
fn new(storage: &'a Storage, batch_size: usize) -> Self { fn new(storage: &'a Storage, batch_size: usize) -> Self {
Self { Self {
@ -1533,7 +1537,7 @@ impl<'a> FileBulkLoader<'a> {
} }
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn map_thread_result<R>(result: thread::Result<R>) -> io::Result<R> { fn map_thread_result<R>(result: thread::Result<R>) -> io::Result<R> {
result.map_err(|e| { result.map_err(|e| {
io::Error::new( io::Error::new(

@ -25,7 +25,7 @@
//! }; //! };
//! # Result::<_, Box<dyn std::error::Error>>::Ok(()) //! # Result::<_, Box<dyn std::error::Error>>::Ok(())
//! ``` //! ```
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use crate::io::RdfParseError; use crate::io::RdfParseError;
use crate::io::{RdfFormat, RdfParser, RdfSerializer}; use crate::io::{RdfFormat, RdfParser, RdfSerializer};
use crate::model::*; use crate::model::*;
@ -34,7 +34,7 @@ use crate::sparql::{
QueryResults, Update, UpdateOptions, QueryResults, Update, UpdateOptions,
}; };
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm}; use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use crate::storage::StorageBulkLoader; use crate::storage::StorageBulkLoader;
use crate::storage::{ use crate::storage::{
ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader, StorageWriter, ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader, StorageWriter,
@ -42,7 +42,7 @@ use crate::storage::{
pub use crate::storage::{CorruptionError, LoaderError, SerializerError, StorageError}; pub use crate::storage::{CorruptionError, LoaderError, SerializerError, StorageError};
use std::error::Error; use std::error::Error;
use std::io::{Read, Write}; use std::io::{Read, Write};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::path::Path; use std::path::Path;
use std::{fmt, str}; use std::{fmt, str};
@ -91,7 +91,13 @@ impl Store {
/// Creates a temporary [`Store`] that will be deleted after drop. /// Creates a temporary [`Store`] that will be deleted after drop.
pub fn new() -> Result<Self, StorageError> { pub fn new() -> Result<Self, StorageError> {
Ok(Self { Ok(Self {
storage: Storage::new()?, storage: Storage::new(None)?,
})
}
pub fn new_with_key(key: [u8; 32]) -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::new(Some(key))?,
}) })
} }
@ -100,30 +106,37 @@ impl Store {
/// Only one read-write [`Store`] can exist at the same time. /// Only one read-write [`Store`] can exist at the same time.
/// If you want to have extra [`Store`] instance opened on a same data /// If you want to have extra [`Store`] instance opened on a same data
/// use [`Store::open_secondary`] or [`Store::open_read_only`]. /// use [`Store::open_secondary`] or [`Store::open_read_only`].
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn open(path: impl AsRef<Path>) -> Result<Self, StorageError> { pub fn open(path: impl AsRef<Path>) -> Result<Self, StorageError> {
Ok(Self { Ok(Self {
storage: Storage::open(path.as_ref())?, storage: Storage::open(path.as_ref(), None)?,
}) })
} }
/// Opens a read-only clone of a running read-write [`Store`]. #[cfg(all(not(target_family = "wasm")))]
/// pub fn open_with_key(path: impl AsRef<Path>, key: [u8; 32]) -> Result<Self, StorageError> {
/// Changes done while this process is running will be replicated after a possible lag.
///
/// It should only be used if a primary instance opened with [`Store::open`] is running at the same time.
/// `primary_path` must be the path of the primary instance.
/// This secondary instance will use temporary storage for the secondary instance cache.
/// If you prefer persistent storage use [`Store::open_persistent_secondary`].
///
/// If you want to simple read-only [`Store`] use [`Store::open_read_only`].
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open_secondary(primary_path: impl AsRef<Path>) -> Result<Self, StorageError> {
Ok(Self { Ok(Self {
storage: Storage::open_secondary(primary_path.as_ref())?, storage: Storage::open(path.as_ref(), Some(key))?,
}) })
} }
// /// Opens a read-only clone of a running read-write [`Store`].
// ///
// /// Changes done while this process is running will be replicated after a possible lag.
// ///
// /// It should only be used if a primary instance opened with [`Store::open`] is running at the same time.
// /// `primary_path` must be the path of the primary instance.
// /// This secondary instance will use temporary storage for the secondary instance cache.
// /// If you prefer persistent storage use [`Store::open_persistent_secondary`].
// ///
// /// If you want to simple read-only [`Store`] use [`Store::open_read_only`].
// #[cfg(all(not(target_family = "wasm")))]
// pub fn open_secondary(primary_path: impl AsRef<Path>) -> Result<Self, StorageError> {
// Ok(Self {
// storage: Storage::open_secondary(primary_path.as_ref())?,
// })
// }
/// Opens a read-only clone of a running read-write [`Store`] with persistence of the secondary instance cache. /// Opens a read-only clone of a running read-write [`Store`] with persistence of the secondary instance cache.
/// ///
/// Changes done while this process is running will be replicated after a possible lag. /// Changes done while this process is running will be replicated after a possible lag.
@ -132,7 +145,7 @@ impl Store {
/// `primary_path` must be the path of the primary instance and `secondary_path` an other directory for the secondary instance cache. /// `primary_path` must be the path of the primary instance and `secondary_path` an other directory for the secondary instance cache.
/// ///
/// If you want to simple read-only [`Store`] use [`Store::open_read_only`]. /// If you want to simple read-only [`Store`] use [`Store::open_read_only`].
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn open_persistent_secondary( pub fn open_persistent_secondary(
primary_path: impl AsRef<Path>, primary_path: impl AsRef<Path>,
secondary_path: impl AsRef<Path>, secondary_path: impl AsRef<Path>,
@ -149,10 +162,13 @@ impl Store {
/// ///
/// Opening as read-only while having an other process writing the database is undefined behavior. /// Opening as read-only while having an other process writing the database is undefined behavior.
/// [`Store::open_secondary`] should be used in this case. /// [`Store::open_secondary`] should be used in this case.
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, StorageError> { pub fn open_read_only(
path: impl AsRef<Path>,
key: Option<[u8; 32]>,
) -> Result<Self, StorageError> {
Ok(Self { Ok(Self {
storage: Storage::open_read_only(path.as_ref())?, storage: Storage::open_read_only(path.as_ref(), key)?,
}) })
} }
@ -930,7 +946,7 @@ impl Store {
/// Flushes all buffers and ensures that all writes are saved on disk. /// Flushes all buffers and ensures that all writes are saved on disk.
/// ///
/// Flushes are automatically done using background threads but might lag a little bit. /// Flushes are automatically done using background threads but might lag a little bit.
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn flush(&self) -> Result<(), StorageError> { pub fn flush(&self) -> Result<(), StorageError> {
self.storage.flush() self.storage.flush()
} }
@ -940,7 +956,7 @@ impl Store {
/// Useful to call after a batch upload or another similar operation. /// Useful to call after a batch upload or another similar operation.
/// ///
/// <div class="warning">Can take hours on huge databases.</div> /// <div class="warning">Can take hours on huge databases.</div>
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn optimize(&self) -> Result<(), StorageError> { pub fn optimize(&self) -> Result<(), StorageError> {
self.storage.compact() self.storage.compact()
} }
@ -963,7 +979,7 @@ impl Store {
/// This allows cheap regular backups. /// This allows cheap regular backups.
/// ///
/// If you want to move your data to another RDF storage system, you should have a look at the [`Store::dump_to_write`] function instead. /// If you want to move your data to another RDF storage system, you should have a look at the [`Store::dump_to_write`] function instead.
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn backup(&self, target_directory: impl AsRef<Path>) -> Result<(), StorageError> { pub fn backup(&self, target_directory: impl AsRef<Path>) -> Result<(), StorageError> {
self.storage.backup(target_directory.as_ref()) self.storage.backup(target_directory.as_ref())
} }
@ -990,7 +1006,7 @@ impl Store {
/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?); /// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
pub fn bulk_loader(&self) -> BulkLoader { pub fn bulk_loader(&self) -> BulkLoader {
BulkLoader { BulkLoader {
storage: StorageBulkLoader::new(self.storage.clone()), storage: StorageBulkLoader::new(self.storage.clone()),
@ -1608,14 +1624,14 @@ impl Iterator for GraphNameIter {
/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?); /// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(()) /// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ``` /// ```
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
#[must_use] #[must_use]
pub struct BulkLoader { pub struct BulkLoader {
storage: StorageBulkLoader, storage: StorageBulkLoader,
on_parse_error: Option<Box<dyn Fn(RdfParseError) -> Result<(), RdfParseError>>>, on_parse_error: Option<Box<dyn Fn(RdfParseError) -> Result<(), RdfParseError>>>,
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
impl BulkLoader { impl BulkLoader {
/// Sets the maximal number of threads to be used by the bulk loader per operation. /// Sets the maximal number of threads to be used by the bulk loader per operation.
/// ///

@ -5,22 +5,22 @@ use oxigraph::io::RdfFormat;
use oxigraph::model::vocab::{rdf, xsd}; use oxigraph::model::vocab::{rdf, xsd};
use oxigraph::model::*; use oxigraph::model::*;
use oxigraph::store::Store; use oxigraph::store::Store;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use rand::random; use rand::random;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::env::temp_dir; use std::env::temp_dir;
use std::error::Error; use std::error::Error;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::fs::{create_dir_all, remove_dir_all, File}; use std::fs::{create_dir_all, remove_dir_all, File};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::io::Write; use std::io::Write;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::iter::empty; use std::iter::empty;
#[cfg(all(target_os = "linux", feature = "rocksdb"))] #[cfg(all(target_os = "linux"))]
use std::iter::once; use std::iter::once;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
#[cfg(all(target_os = "linux", feature = "rocksdb"))] #[cfg(all(target_os = "linux"))]
use std::process::Command; use std::process::Command;
#[allow(clippy::non_ascii_literal)] #[allow(clippy::non_ascii_literal)]
@ -121,7 +121,7 @@ fn test_load_graph() -> Result<(), Box<dyn Error>> {
} }
#[test] #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn test_bulk_load_graph() -> Result<(), Box<dyn Error>> { fn test_bulk_load_graph() -> Result<(), Box<dyn Error>> {
let store = Store::new()?; let store = Store::new()?;
store store
@ -135,7 +135,7 @@ fn test_bulk_load_graph() -> Result<(), Box<dyn Error>> {
} }
#[test] #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn test_bulk_load_graph_lenient() -> Result<(), Box<dyn Error>> { fn test_bulk_load_graph_lenient() -> Result<(), Box<dyn Error>> {
let store = Store::new()?; let store = Store::new()?;
store.bulk_loader().on_parse_error(|_| Ok(())).load_from_read( store.bulk_loader().on_parse_error(|_| Ok(())).load_from_read(
@ -154,7 +154,7 @@ fn test_bulk_load_graph_lenient() -> Result<(), Box<dyn Error>> {
} }
#[test] #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn test_bulk_load_empty() -> Result<(), Box<dyn Error>> { fn test_bulk_load_empty() -> Result<(), Box<dyn Error>> {
let store = Store::new()?; let store = Store::new()?;
store.bulk_loader().load_quads(empty::<Quad>())?; store.bulk_loader().load_quads(empty::<Quad>())?;
@ -177,7 +177,7 @@ fn test_load_dataset() -> Result<(), Box<dyn Error>> {
} }
#[test] #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn test_bulk_load_dataset() -> Result<(), Box<dyn Error>> { fn test_bulk_load_dataset() -> Result<(), Box<dyn Error>> {
let store = Store::new()?; let store = Store::new()?;
store store
@ -258,7 +258,7 @@ fn test_snapshot_isolation_iterator() -> Result<(), Box<dyn Error>> {
} }
#[test] #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box<dyn Error>> { fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box<dyn Error>> {
let quad = QuadRef::new( let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"), NamedNodeRef::new_unchecked("http://example.com/s"),
@ -274,7 +274,7 @@ fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box<dy
} }
#[test] #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn test_open_bad_dir() -> Result<(), Box<dyn Error>> { fn test_open_bad_dir() -> Result<(), Box<dyn Error>> {
let dir = TempDir::default(); let dir = TempDir::default();
create_dir_all(&dir.0)?; create_dir_all(&dir.0)?;
@ -286,7 +286,7 @@ fn test_open_bad_dir() -> Result<(), Box<dyn Error>> {
} }
#[test] #[test]
#[cfg(all(target_os = "linux", feature = "rocksdb"))] #[cfg(all(target_os = "linux"))]
fn test_bad_stt_open() -> Result<(), Box<dyn Error>> { fn test_bad_stt_open() -> Result<(), Box<dyn Error>> {
let dir = TempDir::default(); let dir = TempDir::default();
let store = Store::open(&dir.0)?; let store = Store::open(&dir.0)?;
@ -303,48 +303,48 @@ fn test_bad_stt_open() -> Result<(), Box<dyn Error>> {
Ok(()) Ok(())
} }
#[test] // #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] // #[cfg(all(not(target_family = "wasm")))]
fn test_backup() -> Result<(), Box<dyn Error>> { // fn test_backup() -> Result<(), Box<dyn Error>> {
let quad = QuadRef::new( // let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"), // NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"), // NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"), // NamedNodeRef::new_unchecked("http://example.com/o"),
GraphNameRef::DefaultGraph, // GraphNameRef::DefaultGraph,
); // );
let store_dir = TempDir::default(); // let store_dir = TempDir::default();
let backup_from_rw_dir = TempDir::default(); // let backup_from_rw_dir = TempDir::default();
let backup_from_ro_dir = TempDir::default(); // let backup_from_ro_dir = TempDir::default();
let backup_from_secondary_dir = TempDir::default(); // let backup_from_secondary_dir = TempDir::default();
let store = Store::open(&store_dir)?; // let store = Store::open(&store_dir)?;
store.insert(quad)?; // store.insert(quad)?;
let secondary_store = Store::open_secondary(&store_dir)?; // let secondary_store = Store::open_secondary(&store_dir)?;
store.flush()?; // store.flush()?;
store.backup(&backup_from_rw_dir)?; // store.backup(&backup_from_rw_dir)?;
secondary_store.backup(&backup_from_secondary_dir)?; // secondary_store.backup(&backup_from_secondary_dir)?;
store.remove(quad)?; // store.remove(quad)?;
assert!(!store.contains(quad)?); // assert!(!store.contains(quad)?);
let backup_from_rw = Store::open_read_only(&backup_from_rw_dir.0)?; // let backup_from_rw = Store::open_read_only(&backup_from_rw_dir.0)?;
backup_from_rw.validate()?; // backup_from_rw.validate()?;
assert!(backup_from_rw.contains(quad)?); // assert!(backup_from_rw.contains(quad)?);
backup_from_rw.backup(&backup_from_ro_dir)?; // backup_from_rw.backup(&backup_from_ro_dir)?;
let backup_from_ro = Store::open_read_only(&backup_from_ro_dir.0)?; // let backup_from_ro = Store::open_read_only(&backup_from_ro_dir.0)?;
backup_from_ro.validate()?; // backup_from_ro.validate()?;
assert!(backup_from_ro.contains(quad)?); // assert!(backup_from_ro.contains(quad)?);
let backup_from_secondary = Store::open_read_only(&backup_from_secondary_dir.0)?; // let backup_from_secondary = Store::open_read_only(&backup_from_secondary_dir.0)?;
backup_from_secondary.validate()?; // backup_from_secondary.validate()?;
assert!(backup_from_secondary.contains(quad)?); // assert!(backup_from_secondary.contains(quad)?);
Ok(()) // Ok(())
} // }
#[test] #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn test_bad_backup() -> Result<(), Box<dyn Error>> { fn test_bad_backup() -> Result<(), Box<dyn Error>> {
let store_dir = TempDir::default(); let store_dir = TempDir::default();
let backup_dir = TempDir::default(); let backup_dir = TempDir::default();
@ -355,7 +355,7 @@ fn test_bad_backup() -> Result<(), Box<dyn Error>> {
} }
#[test] #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn test_backup_on_in_memory() -> Result<(), Box<dyn Error>> { fn test_backup_on_in_memory() -> Result<(), Box<dyn Error>> {
let backup_dir = TempDir::default(); let backup_dir = TempDir::default();
Store::new()?.backup(&backup_dir).unwrap_err(); Store::new()?.backup(&backup_dir).unwrap_err();
@ -363,7 +363,7 @@ fn test_backup_on_in_memory() -> Result<(), Box<dyn Error>> {
} }
#[test] #[test]
#[cfg(all(target_os = "linux", feature = "rocksdb"))] #[cfg(all(target_os = "linux"))]
fn test_backward_compatibility() -> Result<(), Box<dyn Error>> { fn test_backward_compatibility() -> Result<(), Box<dyn Error>> {
// We run twice to check if data is properly saved and closed // We run twice to check if data is properly saved and closed
for _ in 0..2 { for _ in 0..2 {
@ -386,63 +386,63 @@ fn test_backward_compatibility() -> Result<(), Box<dyn Error>> {
Ok(()) Ok(())
} }
#[test] // #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] // #[cfg(all(not(target_family = "wasm")))]
fn test_secondary() -> Result<(), Box<dyn Error>> { // fn test_secondary() -> Result<(), Box<dyn Error>> {
let quad = QuadRef::new( // let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"), // NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"), // NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"), // NamedNodeRef::new_unchecked("http://example.com/o"),
GraphNameRef::DefaultGraph, // GraphNameRef::DefaultGraph,
); // );
let primary_dir = TempDir::default(); // let primary_dir = TempDir::default();
// We open the store // // We open the store
let primary = Store::open(&primary_dir)?; // let primary = Store::open(&primary_dir)?;
let secondary = Store::open_secondary(&primary_dir)?; // let secondary = Store::open_secondary(&primary_dir)?;
// We insert a quad // // We insert a quad
primary.insert(quad)?; // primary.insert(quad)?;
primary.flush()?; // primary.flush()?;
// It is readable from both stores // // It is readable from both stores
for store in &[&primary, &secondary] { // for store in &[&primary, &secondary] {
assert!(store.contains(quad)?); // assert!(store.contains(quad)?);
assert_eq!( // assert_eq!(
store.iter().collect::<Result<Vec<_>, _>>()?, // store.iter().collect::<Result<Vec<_>, _>>()?,
vec![quad.into_owned()] // vec![quad.into_owned()]
); // );
} // }
// We validate the states // // We validate the states
primary.validate()?; // primary.validate()?;
secondary.validate()?; // secondary.validate()?;
// We close the primary store and remove its content // // We close the primary store and remove its content
drop(primary); // drop(primary);
remove_dir_all(&primary_dir)?; // remove_dir_all(&primary_dir)?;
// We secondary store is still readable // // We secondary store is still readable
assert!(secondary.contains(quad)?); // assert!(secondary.contains(quad)?);
secondary.validate()?; // secondary.validate()?;
Ok(()) // Ok(())
} // }
#[test] // #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] // #[cfg(all(not(target_family = "wasm")))]
fn test_open_secondary_bad_dir() -> Result<(), Box<dyn Error>> { // fn test_open_secondary_bad_dir() -> Result<(), Box<dyn Error>> {
let primary_dir = TempDir::default(); // let primary_dir = TempDir::default();
create_dir_all(&primary_dir.0)?; // create_dir_all(&primary_dir.0)?;
{ // {
File::create(primary_dir.0.join("CURRENT"))?.write_all(b"foo")?; // File::create(primary_dir.0.join("CURRENT"))?.write_all(b"foo")?;
} // }
assert!(Store::open_secondary(&primary_dir).is_err()); // assert!(Store::open_secondary(&primary_dir).is_err());
Ok(()) // Ok(())
} // }
#[test] #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn test_read_only() -> Result<(), Box<dyn Error>> { fn test_read_only() -> Result<(), Box<dyn Error>> {
let s = NamedNodeRef::new_unchecked("http://example.com/s"); let s = NamedNodeRef::new_unchecked("http://example.com/s");
let p = NamedNodeRef::new_unchecked("http://example.com/p"); let p = NamedNodeRef::new_unchecked("http://example.com/p");
@ -468,7 +468,7 @@ fn test_read_only() -> Result<(), Box<dyn Error>> {
} }
// We open as read-only // We open as read-only
let read_only = Store::open_read_only(&store_dir)?; let read_only = Store::open_read_only(&store_dir, None)?;
assert!(read_only.contains(first_quad)?); assert!(read_only.contains(first_quad)?);
assert_eq!( assert_eq!(
read_only.iter().collect::<Result<Vec<_>, _>>()?, read_only.iter().collect::<Result<Vec<_>, _>>()?,
@ -491,18 +491,18 @@ fn test_read_only() -> Result<(), Box<dyn Error>> {
} }
#[test] #[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
fn test_open_read_only_bad_dir() -> Result<(), Box<dyn Error>> { fn test_open_read_only_bad_dir() -> Result<(), Box<dyn Error>> {
let dir = TempDir::default(); let dir = TempDir::default();
create_dir_all(&dir.0)?; create_dir_all(&dir.0)?;
{ {
File::create(dir.0.join("CURRENT"))?.write_all(b"foo")?; File::create(dir.0.join("CURRENT"))?.write_all(b"foo")?;
} }
assert!(Store::open_read_only(&dir).is_err()); assert!(Store::open_read_only(&dir, None).is_err());
Ok(()) Ok(())
} }
#[cfg(all(target_os = "linux", feature = "rocksdb"))] #[cfg(all(target_os = "linux"))]
fn reset_dir(dir: &str) -> Result<(), Box<dyn Error>> { fn reset_dir(dir: &str) -> Result<(), Box<dyn Error>> {
assert!(Command::new("git") assert!(Command::new("git")
.args(["clean", "-fX", dir]) .args(["clean", "-fX", dir])
@ -515,24 +515,24 @@ fn reset_dir(dir: &str) -> Result<(), Box<dyn Error>> {
Ok(()) Ok(())
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
struct TempDir(PathBuf); struct TempDir(PathBuf);
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
impl Default for TempDir { impl Default for TempDir {
fn default() -> Self { fn default() -> Self {
Self(temp_dir().join(format!("oxigraph-test-{}", random::<u128>()))) Self(temp_dir().join(format!("oxigraph-test-{}", random::<u128>())))
} }
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
impl AsRef<Path> for TempDir { impl AsRef<Path> for TempDir {
fn as_ref(&self) -> &Path { fn as_ref(&self) -> &Path {
&self.0 &self.0
} }
} }
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] #[cfg(all(not(target_family = "wasm")))]
impl Drop for TempDir { impl Drop for TempDir {
fn drop(&mut self) { fn drop(&mut self) {
if self.0.is_dir() { if self.0.is_dir() {

Loading…
Cancel
Save