Compare commits

...

2 Commits

  1. 6
      Cargo.lock
  2. 1
      Cargo.toml
  3. 2
      lib/oxigraph/Cargo.toml
  4. 8
      lib/oxigraph/src/storage/backend/mod.rs
  5. 164
      lib/oxigraph/src/storage/backend/oxi_rocksdb.rs
  6. 2
      lib/oxigraph/src/storage/binary_encoder.rs
  7. 116
      lib/oxigraph/src/storage/mod.rs
  8. 74
      lib/oxigraph/src/store.rs
  9. 240
      lib/oxigraph/tests/store.rs

6
Cargo.lock generated

@ -629,13 +629,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19"
dependencies = [
"cfg-if",
"windows-targets 0.48.5",
"windows-targets 0.52.4",
]
[[package]]
name = "librocksdb-sys"
version = "0.11.0+8.3.2"
source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#cedbf494b4ec11638f1e0b7446731e0b73573352"
source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#9a1595d5cbf1d1d8a9b94346543a8ddae2bee0be"
dependencies = [
"bindgen",
"bzip2-sys",
@ -1118,7 +1118,7 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f"
[[package]]
name = "rocksdb"
version = "0.21.0"
source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#cedbf494b4ec11638f1e0b7446731e0b73573352"
source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#9a1595d5cbf1d1d8a9b94346543a8ddae2bee0be"
dependencies = [
"libc",
"librocksdb-sys",

@ -23,6 +23,7 @@ edition = "2021"
rust-version = "1.70"
[workspace.dependencies]
rocksdb = {git = "https://git.nextgraph.org/NextGraph/rust-rocksdb.git", branch = "master", features = [ ] }
anyhow = "1.0.72"
arbitrary = "1.3"
assert_cmd = "2.0"

@ -41,7 +41,7 @@ thiserror.workspace = true
[target.'cfg(not(target_family = "wasm"))'.dependencies]
libc = "0.2"
rocksdb = {git = "https://git.nextgraph.org/NextGraph/rust-rocksdb.git", branch = "master", features = [ ], optional = true }
rocksdb.workspace = true
[target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies]
getrandom.workspace = true

@ -1,12 +1,12 @@
//! A storage backend
//! RocksDB is available, if not in memory
#[cfg(any(target_family = "wasm", not(feature = "rocksdb")))]
#[cfg(any(target_family = "wasm"))]
pub use fallback::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, Transaction};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub use oxi_rocksdb::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, Transaction};
#[cfg(any(target_family = "wasm", not(feature = "rocksdb")))]
#[cfg(any(target_family = "wasm"))]
mod fallback;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
mod oxi_rocksdb;

@ -28,6 +28,13 @@ use std::sync::{Arc, OnceLock};
use std::thread::{available_parallelism, yield_now};
use std::{fmt, io, ptr, slice};
pub fn opt_bytes_to_ptr<T: AsRef<[u8]>>(opt: Option<T>) -> *const c_char {
match opt {
Some(v) => v.as_ref().as_ptr() as *const c_char,
None => ptr::null(),
}
}
macro_rules! ffi_result {
( $($function:ident)::*( $arg1:expr $(, $arg:expr)* $(,)? ) ) => {{
let mut status = rocksdb_status_t {
@ -148,13 +155,17 @@ impl Drop for RoDbHandler {
}
impl Db {
pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self, StorageError> {
Self::open_read_write(None, column_families)
pub fn new(
column_families: Vec<ColumnFamilyDefinition>,
key: Option<[u8; 32]>,
) -> Result<Self, StorageError> {
Self::open_read_write(None, column_families, key)
}
pub fn open_read_write(
path: Option<&Path>,
column_families: Vec<ColumnFamilyDefinition>,
key: Option<[u8; 32]>,
) -> Result<Self, StorageError> {
let (path, in_memory) = if let Some(path) = path {
(path.to_path_buf(), false)
@ -163,7 +174,7 @@ impl Db {
};
let c_path = path_to_cstring(&path)?;
unsafe {
let options = Self::db_options(true, in_memory)?;
let options = Self::db_options(true, in_memory, key)?;
rocksdb_options_set_create_if_missing(options, 1);
rocksdb_options_set_create_missing_column_families(options, 1);
rocksdb_options_set_compression(
@ -305,82 +316,83 @@ impl Db {
}
}
pub fn open_secondary(
primary_path: &Path,
secondary_path: Option<&Path>,
column_families: Vec<ColumnFamilyDefinition>,
) -> Result<Self, StorageError> {
let c_primary_path = path_to_cstring(primary_path)?;
let (secondary_path, in_memory) = if let Some(path) = secondary_path {
(path.to_path_buf(), false)
} else {
(tmp_path(), true)
};
let c_secondary_path = path_to_cstring(&secondary_path)?;
unsafe {
let options = Self::db_options(false, false)?;
let (column_family_names, c_column_family_names, cf_options) =
Self::column_families_names_and_options(column_families, options);
let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> =
vec![ptr::null_mut(); column_family_names.len()];
let c_num_column_families = c_column_family_names.len().try_into().unwrap();
let db = ffi_result!(rocksdb_open_as_secondary_column_families_with_status(
options,
c_primary_path.as_ptr(),
c_secondary_path.as_ptr(),
c_num_column_families,
c_column_family_names
.iter()
.map(|cf| cf.as_ptr())
.collect::<Vec<_>>()
.as_ptr(),
cf_options.as_ptr().cast(),
cf_handles.as_mut_ptr(),
))
.map_err(|e| {
for cf_option in &cf_options {
rocksdb_options_destroy(*cf_option);
}
rocksdb_options_destroy(options);
e
})?;
assert!(
!db.is_null(),
"rocksdb_open_for_read_only_column_families_with_status returned null"
);
for handle in &cf_handles {
assert!(
!handle.is_null(),
"rocksdb_open_for_read_only_column_families_with_status returned a null column family"
);
}
let read_options = rocksdb_readoptions_create();
assert!(
!read_options.is_null(),
"rocksdb_readoptions_create returned null"
);
Ok(Self {
inner: DbKind::ReadOnly(Arc::new(RoDbHandler {
db,
options,
read_options,
column_family_names,
cf_handles,
cf_options,
is_secondary: true,
path_to_remove: in_memory.then_some(secondary_path),
})),
})
}
}
// pub fn open_secondary(
// primary_path: &Path,
// secondary_path: Option<&Path>,
// column_families: Vec<ColumnFamilyDefinition>,
// ) -> Result<Self, StorageError> {
// let c_primary_path = path_to_cstring(primary_path)?;
// let (secondary_path, in_memory) = if let Some(path) = secondary_path {
// (path.to_path_buf(), false)
// } else {
// (tmp_path(), true)
// };
// let c_secondary_path = path_to_cstring(&secondary_path)?;
// unsafe {
// let options = Self::db_options(false, false)?;
// let (column_family_names, c_column_family_names, cf_options) =
// Self::column_families_names_and_options(column_families, options);
// let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> =
// vec![ptr::null_mut(); column_family_names.len()];
// let c_num_column_families = c_column_family_names.len().try_into().unwrap();
// let db = ffi_result!(rocksdb_open_as_secondary_column_families_with_status(
// options,
// c_primary_path.as_ptr(),
// c_secondary_path.as_ptr(),
// c_num_column_families,
// c_column_family_names
// .iter()
// .map(|cf| cf.as_ptr())
// .collect::<Vec<_>>()
// .as_ptr(),
// cf_options.as_ptr().cast(),
// cf_handles.as_mut_ptr(),
// ))
// .map_err(|e| {
// for cf_option in &cf_options {
// rocksdb_options_destroy(*cf_option);
// }
// rocksdb_options_destroy(options);
// e
// })?;
// assert!(
// !db.is_null(),
// "rocksdb_open_for_read_only_column_families_with_status returned null"
// );
// for handle in &cf_handles {
// assert!(
// !handle.is_null(),
// "rocksdb_open_for_read_only_column_families_with_status returned a null column family"
// );
// }
// let read_options = rocksdb_readoptions_create();
// assert!(
// !read_options.is_null(),
// "rocksdb_readoptions_create returned null"
// );
// Ok(Self {
// inner: DbKind::ReadOnly(Arc::new(RoDbHandler {
// db,
// options,
// read_options,
// column_family_names,
// cf_handles,
// cf_options,
// is_secondary: true,
// path_to_remove: in_memory.then_some(secondary_path),
// })),
// })
// }
// }
pub fn open_read_only(
path: &Path,
column_families: Vec<ColumnFamilyDefinition>,
key: Option<[u8; 32]>,
) -> Result<Self, StorageError> {
unsafe {
let c_path = path_to_cstring(path)?;
let options = Self::db_options(true, false)?;
let options = Self::db_options(true, false, key)?;
let (column_family_names, c_column_family_names, cf_options) =
Self::column_families_names_and_options(column_families, options);
let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> =
@ -440,6 +452,7 @@ impl Db {
fn db_options(
limit_max_open_files: bool,
in_memory: bool,
key: Option<[u8; 32]>,
) -> Result<*mut rocksdb_options_t, StorageError> {
static ROCKSDB_ENV: OnceLock<UnsafeEnv> = OnceLock::new();
static ROCKSDB_MEM_ENV: OnceLock<UnsafeEnv> = OnceLock::new();
@ -487,8 +500,11 @@ impl Db {
})
} else {
ROCKSDB_ENV.get_or_init(|| {
let env = rocksdb_create_default_env();
assert!(!env.is_null(), "rocksdb_create_default_env returned null");
let env = match key {
Some(_) => rocksdb_create_encrypted_env(opt_bytes_to_ptr(key.as_ref())),
None => rocksdb_create_default_env(),
};
assert!(!env.is_null(), "rocksdb_create_encrypted_env returned null");
UnsafeEnv(env)
})
}

@ -5,7 +5,7 @@ use oxsdatatypes::*;
use std::io::Read;
use std::mem::size_of;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub const LATEST_STORAGE_VERSION: u64 = 1;
pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<StrHash>();

@ -1,9 +1,9 @@
#![allow(clippy::same_name_method)]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use crate::model::Quad;
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef, TermRef};
use crate::storage::backend::{Reader, Transaction};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use crate::storage::binary_encoder::LATEST_STORAGE_VERSION;
use crate::storage::binary_encoder::{
decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple,
@ -12,22 +12,22 @@ use crate::storage::binary_encoder::{
WRITTEN_TERM_MAX_SIZE,
};
pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, StorageError};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use crate::storage::numeric_encoder::Decoder;
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::collections::VecDeque;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::collections::{HashMap, HashSet};
use std::error::Error;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::mem::{swap, take};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::path::{Path, PathBuf};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::sync::Mutex;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::{io, thread};
mod backend;
@ -47,16 +47,16 @@ const DSPO_CF: &str = "dspo";
const DPOS_CF: &str = "dpos";
const DOSP_CF: &str = "dosp";
const GRAPHS_CF: &str = "graphs";
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
const DEFAULT_CF: &str = "default";
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000;
/// Low level storage primitives
#[derive(Clone)]
pub struct Storage {
db: Db,
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
default_cf: ColumnFamily,
id2str_cf: ColumnFamily,
spog_cf: ColumnFamily,
@ -72,39 +72,43 @@ pub struct Storage {
}
impl Storage {
pub fn new() -> Result<Self, StorageError> {
Self::setup(Db::new(Self::column_families())?)
pub fn new(key: Option<[u8; 32]>) -> Result<Self, StorageError> {
Self::setup(Db::new(Self::column_families(), key)?)
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open(path: &Path) -> Result<Self, StorageError> {
Self::setup(Db::open_read_write(Some(path), Self::column_families())?)
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open_secondary(primary_path: &Path) -> Result<Self, StorageError> {
Self::setup(Db::open_secondary(
primary_path,
None,
Self::column_families(),
)?)
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open_persistent_secondary(
primary_path: &Path,
secondary_path: &Path,
) -> Result<Self, StorageError> {
Self::setup(Db::open_secondary(
primary_path,
Some(secondary_path),
#[cfg(all(not(target_family = "wasm")))]
pub fn open(path: &Path, key: Option<[u8; 32]>) -> Result<Self, StorageError> {
Self::setup(Db::open_read_write(
Some(path),
Self::column_families(),
key,
)?)
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open_read_only(path: &Path) -> Result<Self, StorageError> {
Self::setup(Db::open_read_only(path, Self::column_families())?)
// #[cfg(all(not(target_family = "wasm")))]
// pub fn open_secondary(primary_path: &Path) -> Result<Self, StorageError> {
// Self::setup(Db::open_secondary(
// primary_path,
// None,
// Self::column_families(),
// )?)
// }
// #[cfg(all(not(target_family = "wasm")))]
// pub fn open_persistent_secondary(
// primary_path: &Path,
// secondary_path: &Path,
// ) -> Result<Self, StorageError> {
// Self::setup(Db::open_secondary(
// primary_path,
// Some(secondary_path),
// Self::column_families(),
// )?)
// }
#[cfg(all(not(target_family = "wasm")))]
pub fn open_read_only(path: &Path, key: Option<[u8; 32]>) -> Result<Self, StorageError> {
Self::setup(Db::open_read_only(path, Self::column_families(), key)?)
}
fn column_families() -> Vec<ColumnFamilyDefinition> {
@ -180,7 +184,7 @@ impl Storage {
fn setup(db: Db) -> Result<Self, StorageError> {
let this = Self {
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
default_cf: db.column_family(DEFAULT_CF)?,
id2str_cf: db.column_family(ID2STR_CF)?,
spog_cf: db.column_family(SPOG_CF)?,
@ -195,12 +199,12 @@ impl Storage {
graphs_cf: db.column_family(GRAPHS_CF)?,
db,
};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
this.migrate()?;
Ok(this)
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn migrate(&self) -> Result<(), StorageError> {
let mut version = self.ensure_version()?;
if version == 0 {
@ -240,7 +244,7 @@ impl Storage {
}
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn ensure_version(&self) -> Result<u64, StorageError> {
Ok(
if let Some(version) = self.db.get(&self.default_cf, b"oxversion")? {
@ -254,7 +258,7 @@ impl Storage {
)
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn update_version(&self, version: u64) -> Result<(), StorageError> {
self.db
.insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?;
@ -281,12 +285,12 @@ impl Storage {
})
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn flush(&self) -> Result<(), StorageError> {
self.db.flush()
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn compact(&self) -> Result<(), StorageError> {
self.db.compact(&self.default_cf)?;
self.db.compact(&self.gspo_cf)?;
@ -301,7 +305,7 @@ impl Storage {
self.db.compact(&self.id2str_cf)
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> {
self.db.backup(target_directory)
}
@ -626,7 +630,7 @@ impl StorageReader {
}
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
Ok(self
.storage
@ -647,7 +651,7 @@ impl StorageReader {
.map_err(CorruptionError::new)?)
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
self.storage
.db
@ -661,7 +665,7 @@ impl StorageReader {
}
/// Validates that all the storage invariants held in the data
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn validate(&self) -> Result<(), StorageError> {
// triples
let dspo_size = self.dspo_quads(&[]).count();
@ -997,7 +1001,7 @@ impl<'a> StorageWriter<'a> {
}
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> {
if self
.storage
@ -1178,7 +1182,7 @@ impl<'a> StorageWriter<'a> {
}
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
#[must_use]
pub struct StorageBulkLoader {
storage: Storage,
@ -1187,7 +1191,7 @@ pub struct StorageBulkLoader {
max_memory_size: Option<usize>,
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
impl StorageBulkLoader {
pub fn new(storage: Storage) -> Self {
Self {
@ -1318,7 +1322,7 @@ impl StorageBulkLoader {
}
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
struct FileBulkLoader<'a> {
storage: &'a Storage,
id2str: HashMap<StrHash, Box<str>>,
@ -1327,7 +1331,7 @@ struct FileBulkLoader<'a> {
graphs: HashSet<EncodedTerm>,
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
impl<'a> FileBulkLoader<'a> {
fn new(storage: &'a Storage, batch_size: usize) -> Self {
Self {
@ -1533,7 +1537,7 @@ impl<'a> FileBulkLoader<'a> {
}
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn map_thread_result<R>(result: thread::Result<R>) -> io::Result<R> {
result.map_err(|e| {
io::Error::new(

@ -25,7 +25,7 @@
//! };
//! # Result::<_, Box<dyn std::error::Error>>::Ok(())
//! ```
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use crate::io::RdfParseError;
use crate::io::{RdfFormat, RdfParser, RdfSerializer};
use crate::model::*;
@ -34,7 +34,7 @@ use crate::sparql::{
QueryResults, Update, UpdateOptions,
};
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use crate::storage::StorageBulkLoader;
use crate::storage::{
ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader, StorageWriter,
@ -42,7 +42,7 @@ use crate::storage::{
pub use crate::storage::{CorruptionError, LoaderError, SerializerError, StorageError};
use std::error::Error;
use std::io::{Read, Write};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::path::Path;
use std::{fmt, str};
@ -91,7 +91,13 @@ impl Store {
/// Creates a temporary [`Store`] that will be deleted after drop.
pub fn new() -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::new()?,
storage: Storage::new(None)?,
})
}
pub fn new_with_key(key: [u8; 32]) -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::new(Some(key))?,
})
}
@ -100,30 +106,37 @@ impl Store {
/// Only one read-write [`Store`] can exist at the same time.
/// If you want to have extra [`Store`] instance opened on a same data
/// use [`Store::open_secondary`] or [`Store::open_read_only`].
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn open(path: impl AsRef<Path>) -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::open(path.as_ref())?,
storage: Storage::open(path.as_ref(), None)?,
})
}
/// Opens a read-only clone of a running read-write [`Store`].
///
/// Changes done while this process is running will be replicated after a possible lag.
///
/// It should only be used if a primary instance opened with [`Store::open`] is running at the same time.
/// `primary_path` must be the path of the primary instance.
/// This secondary instance will use temporary storage for the secondary instance cache.
/// If you prefer persistent storage use [`Store::open_persistent_secondary`].
///
/// If you want to simple read-only [`Store`] use [`Store::open_read_only`].
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open_secondary(primary_path: impl AsRef<Path>) -> Result<Self, StorageError> {
#[cfg(all(not(target_family = "wasm")))]
pub fn open_with_key(path: impl AsRef<Path>, key: [u8; 32]) -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::open_secondary(primary_path.as_ref())?,
storage: Storage::open(path.as_ref(), Some(key))?,
})
}
// /// Opens a read-only clone of a running read-write [`Store`].
// ///
// /// Changes done while this process is running will be replicated after a possible lag.
// ///
// /// It should only be used if a primary instance opened with [`Store::open`] is running at the same time.
// /// `primary_path` must be the path of the primary instance.
// /// This secondary instance will use temporary storage for the secondary instance cache.
// /// If you prefer persistent storage use [`Store::open_persistent_secondary`].
// ///
// /// If you want to simple read-only [`Store`] use [`Store::open_read_only`].
// #[cfg(all(not(target_family = "wasm")))]
// pub fn open_secondary(primary_path: impl AsRef<Path>) -> Result<Self, StorageError> {
// Ok(Self {
// storage: Storage::open_secondary(primary_path.as_ref())?,
// })
// }
/// Opens a read-only clone of a running read-write [`Store`] with persistence of the secondary instance cache.
///
/// Changes done while this process is running will be replicated after a possible lag.
@ -132,7 +145,7 @@ impl Store {
/// `primary_path` must be the path of the primary instance and `secondary_path` an other directory for the secondary instance cache.
///
/// If you want to simple read-only [`Store`] use [`Store::open_read_only`].
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn open_persistent_secondary(
primary_path: impl AsRef<Path>,
secondary_path: impl AsRef<Path>,
@ -149,10 +162,13 @@ impl Store {
///
/// Opening as read-only while having an other process writing the database is undefined behavior.
/// [`Store::open_secondary`] should be used in this case.
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
pub fn open_read_only(path: impl AsRef<Path>) -> Result<Self, StorageError> {
#[cfg(all(not(target_family = "wasm")))]
pub fn open_read_only(
path: impl AsRef<Path>,
key: Option<[u8; 32]>,
) -> Result<Self, StorageError> {
Ok(Self {
storage: Storage::open_read_only(path.as_ref())?,
storage: Storage::open_read_only(path.as_ref(), key)?,
})
}
@ -930,7 +946,7 @@ impl Store {
/// Flushes all buffers and ensures that all writes are saved on disk.
///
/// Flushes are automatically done using background threads but might lag a little bit.
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn flush(&self) -> Result<(), StorageError> {
self.storage.flush()
}
@ -940,7 +956,7 @@ impl Store {
/// Useful to call after a batch upload or another similar operation.
///
/// <div class="warning">Can take hours on huge databases.</div>
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn optimize(&self) -> Result<(), StorageError> {
self.storage.compact()
}
@ -963,7 +979,7 @@ impl Store {
/// This allows cheap regular backups.
///
/// If you want to move your data to another RDF storage system, you should have a look at the [`Store::dump_to_write`] function instead.
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn backup(&self, target_directory: impl AsRef<Path>) -> Result<(), StorageError> {
self.storage.backup(target_directory.as_ref())
}
@ -990,7 +1006,7 @@ impl Store {
/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ```
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
pub fn bulk_loader(&self) -> BulkLoader {
BulkLoader {
storage: StorageBulkLoader::new(self.storage.clone()),
@ -1608,14 +1624,14 @@ impl Iterator for GraphNameIter {
/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?);
/// # Result::<_, Box<dyn std::error::Error>>::Ok(())
/// ```
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
#[must_use]
pub struct BulkLoader {
storage: StorageBulkLoader,
on_parse_error: Option<Box<dyn Fn(RdfParseError) -> Result<(), RdfParseError>>>,
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
impl BulkLoader {
/// Sets the maximal number of threads to be used by the bulk loader per operation.
///

@ -5,22 +5,22 @@ use oxigraph::io::RdfFormat;
use oxigraph::model::vocab::{rdf, xsd};
use oxigraph::model::*;
use oxigraph::store::Store;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use rand::random;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::env::temp_dir;
use std::error::Error;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::fs::{create_dir_all, remove_dir_all, File};
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::io::Write;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::iter::empty;
#[cfg(all(target_os = "linux", feature = "rocksdb"))]
#[cfg(all(target_os = "linux"))]
use std::iter::once;
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
use std::path::{Path, PathBuf};
#[cfg(all(target_os = "linux", feature = "rocksdb"))]
#[cfg(all(target_os = "linux"))]
use std::process::Command;
#[allow(clippy::non_ascii_literal)]
@ -121,7 +121,7 @@ fn test_load_graph() -> Result<(), Box<dyn Error>> {
}
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn test_bulk_load_graph() -> Result<(), Box<dyn Error>> {
let store = Store::new()?;
store
@ -135,7 +135,7 @@ fn test_bulk_load_graph() -> Result<(), Box<dyn Error>> {
}
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn test_bulk_load_graph_lenient() -> Result<(), Box<dyn Error>> {
let store = Store::new()?;
store.bulk_loader().on_parse_error(|_| Ok(())).load_from_read(
@ -154,7 +154,7 @@ fn test_bulk_load_graph_lenient() -> Result<(), Box<dyn Error>> {
}
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn test_bulk_load_empty() -> Result<(), Box<dyn Error>> {
let store = Store::new()?;
store.bulk_loader().load_quads(empty::<Quad>())?;
@ -177,7 +177,7 @@ fn test_load_dataset() -> Result<(), Box<dyn Error>> {
}
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn test_bulk_load_dataset() -> Result<(), Box<dyn Error>> {
let store = Store::new()?;
store
@ -258,7 +258,7 @@ fn test_snapshot_isolation_iterator() -> Result<(), Box<dyn Error>> {
}
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box<dyn Error>> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
@ -274,7 +274,7 @@ fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box<dy
}
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn test_open_bad_dir() -> Result<(), Box<dyn Error>> {
let dir = TempDir::default();
create_dir_all(&dir.0)?;
@ -286,7 +286,7 @@ fn test_open_bad_dir() -> Result<(), Box<dyn Error>> {
}
#[test]
#[cfg(all(target_os = "linux", feature = "rocksdb"))]
#[cfg(all(target_os = "linux"))]
fn test_bad_stt_open() -> Result<(), Box<dyn Error>> {
let dir = TempDir::default();
let store = Store::open(&dir.0)?;
@ -303,48 +303,48 @@ fn test_bad_stt_open() -> Result<(), Box<dyn Error>> {
Ok(())
}
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
fn test_backup() -> Result<(), Box<dyn Error>> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"),
GraphNameRef::DefaultGraph,
);
let store_dir = TempDir::default();
let backup_from_rw_dir = TempDir::default();
let backup_from_ro_dir = TempDir::default();
let backup_from_secondary_dir = TempDir::default();
let store = Store::open(&store_dir)?;
store.insert(quad)?;
let secondary_store = Store::open_secondary(&store_dir)?;
store.flush()?;
store.backup(&backup_from_rw_dir)?;
secondary_store.backup(&backup_from_secondary_dir)?;
store.remove(quad)?;
assert!(!store.contains(quad)?);
let backup_from_rw = Store::open_read_only(&backup_from_rw_dir.0)?;
backup_from_rw.validate()?;
assert!(backup_from_rw.contains(quad)?);
backup_from_rw.backup(&backup_from_ro_dir)?;
let backup_from_ro = Store::open_read_only(&backup_from_ro_dir.0)?;
backup_from_ro.validate()?;
assert!(backup_from_ro.contains(quad)?);
let backup_from_secondary = Store::open_read_only(&backup_from_secondary_dir.0)?;
backup_from_secondary.validate()?;
assert!(backup_from_secondary.contains(quad)?);
Ok(())
}
// #[test]
// #[cfg(all(not(target_family = "wasm")))]
// fn test_backup() -> Result<(), Box<dyn Error>> {
// let quad = QuadRef::new(
// NamedNodeRef::new_unchecked("http://example.com/s"),
// NamedNodeRef::new_unchecked("http://example.com/p"),
// NamedNodeRef::new_unchecked("http://example.com/o"),
// GraphNameRef::DefaultGraph,
// );
// let store_dir = TempDir::default();
// let backup_from_rw_dir = TempDir::default();
// let backup_from_ro_dir = TempDir::default();
// let backup_from_secondary_dir = TempDir::default();
// let store = Store::open(&store_dir)?;
// store.insert(quad)?;
// let secondary_store = Store::open_secondary(&store_dir)?;
// store.flush()?;
// store.backup(&backup_from_rw_dir)?;
// secondary_store.backup(&backup_from_secondary_dir)?;
// store.remove(quad)?;
// assert!(!store.contains(quad)?);
// let backup_from_rw = Store::open_read_only(&backup_from_rw_dir.0)?;
// backup_from_rw.validate()?;
// assert!(backup_from_rw.contains(quad)?);
// backup_from_rw.backup(&backup_from_ro_dir)?;
// let backup_from_ro = Store::open_read_only(&backup_from_ro_dir.0)?;
// backup_from_ro.validate()?;
// assert!(backup_from_ro.contains(quad)?);
// let backup_from_secondary = Store::open_read_only(&backup_from_secondary_dir.0)?;
// backup_from_secondary.validate()?;
// assert!(backup_from_secondary.contains(quad)?);
// Ok(())
// }
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn test_bad_backup() -> Result<(), Box<dyn Error>> {
let store_dir = TempDir::default();
let backup_dir = TempDir::default();
@ -355,7 +355,7 @@ fn test_bad_backup() -> Result<(), Box<dyn Error>> {
}
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn test_backup_on_in_memory() -> Result<(), Box<dyn Error>> {
let backup_dir = TempDir::default();
Store::new()?.backup(&backup_dir).unwrap_err();
@ -363,7 +363,7 @@ fn test_backup_on_in_memory() -> Result<(), Box<dyn Error>> {
}
#[test]
#[cfg(all(target_os = "linux", feature = "rocksdb"))]
#[cfg(all(target_os = "linux"))]
fn test_backward_compatibility() -> Result<(), Box<dyn Error>> {
// We run twice to check if data is properly saved and closed
for _ in 0..2 {
@ -386,63 +386,63 @@ fn test_backward_compatibility() -> Result<(), Box<dyn Error>> {
Ok(())
}
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
fn test_secondary() -> Result<(), Box<dyn Error>> {
let quad = QuadRef::new(
NamedNodeRef::new_unchecked("http://example.com/s"),
NamedNodeRef::new_unchecked("http://example.com/p"),
NamedNodeRef::new_unchecked("http://example.com/o"),
GraphNameRef::DefaultGraph,
);
let primary_dir = TempDir::default();
// We open the store
let primary = Store::open(&primary_dir)?;
let secondary = Store::open_secondary(&primary_dir)?;
// We insert a quad
primary.insert(quad)?;
primary.flush()?;
// It is readable from both stores
for store in &[&primary, &secondary] {
assert!(store.contains(quad)?);
assert_eq!(
store.iter().collect::<Result<Vec<_>, _>>()?,
vec![quad.into_owned()]
);
}
// We validate the states
primary.validate()?;
secondary.validate()?;
// We close the primary store and remove its content
drop(primary);
remove_dir_all(&primary_dir)?;
// We secondary store is still readable
assert!(secondary.contains(quad)?);
secondary.validate()?;
Ok(())
}
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
fn test_open_secondary_bad_dir() -> Result<(), Box<dyn Error>> {
let primary_dir = TempDir::default();
create_dir_all(&primary_dir.0)?;
{
File::create(primary_dir.0.join("CURRENT"))?.write_all(b"foo")?;
}
assert!(Store::open_secondary(&primary_dir).is_err());
Ok(())
}
// #[test]
// #[cfg(all(not(target_family = "wasm")))]
// fn test_secondary() -> Result<(), Box<dyn Error>> {
// let quad = QuadRef::new(
// NamedNodeRef::new_unchecked("http://example.com/s"),
// NamedNodeRef::new_unchecked("http://example.com/p"),
// NamedNodeRef::new_unchecked("http://example.com/o"),
// GraphNameRef::DefaultGraph,
// );
// let primary_dir = TempDir::default();
// // We open the store
// let primary = Store::open(&primary_dir)?;
// let secondary = Store::open_secondary(&primary_dir)?;
// // We insert a quad
// primary.insert(quad)?;
// primary.flush()?;
// // It is readable from both stores
// for store in &[&primary, &secondary] {
// assert!(store.contains(quad)?);
// assert_eq!(
// store.iter().collect::<Result<Vec<_>, _>>()?,
// vec![quad.into_owned()]
// );
// }
// // We validate the states
// primary.validate()?;
// secondary.validate()?;
// // We close the primary store and remove its content
// drop(primary);
// remove_dir_all(&primary_dir)?;
// // We secondary store is still readable
// assert!(secondary.contains(quad)?);
// secondary.validate()?;
// Ok(())
// }
// #[test]
// #[cfg(all(not(target_family = "wasm")))]
// fn test_open_secondary_bad_dir() -> Result<(), Box<dyn Error>> {
// let primary_dir = TempDir::default();
// create_dir_all(&primary_dir.0)?;
// {
// File::create(primary_dir.0.join("CURRENT"))?.write_all(b"foo")?;
// }
// assert!(Store::open_secondary(&primary_dir).is_err());
// Ok(())
// }
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn test_read_only() -> Result<(), Box<dyn Error>> {
let s = NamedNodeRef::new_unchecked("http://example.com/s");
let p = NamedNodeRef::new_unchecked("http://example.com/p");
@ -468,7 +468,7 @@ fn test_read_only() -> Result<(), Box<dyn Error>> {
}
// We open as read-only
let read_only = Store::open_read_only(&store_dir)?;
let read_only = Store::open_read_only(&store_dir, None)?;
assert!(read_only.contains(first_quad)?);
assert_eq!(
read_only.iter().collect::<Result<Vec<_>, _>>()?,
@ -491,18 +491,18 @@ fn test_read_only() -> Result<(), Box<dyn Error>> {
}
#[test]
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
fn test_open_read_only_bad_dir() -> Result<(), Box<dyn Error>> {
let dir = TempDir::default();
create_dir_all(&dir.0)?;
{
File::create(dir.0.join("CURRENT"))?.write_all(b"foo")?;
}
assert!(Store::open_read_only(&dir).is_err());
assert!(Store::open_read_only(&dir, None).is_err());
Ok(())
}
#[cfg(all(target_os = "linux", feature = "rocksdb"))]
#[cfg(all(target_os = "linux"))]
fn reset_dir(dir: &str) -> Result<(), Box<dyn Error>> {
assert!(Command::new("git")
.args(["clean", "-fX", dir])
@ -515,24 +515,24 @@ fn reset_dir(dir: &str) -> Result<(), Box<dyn Error>> {
Ok(())
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
struct TempDir(PathBuf);
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
impl Default for TempDir {
fn default() -> Self {
Self(temp_dir().join(format!("oxigraph-test-{}", random::<u128>())))
}
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
impl AsRef<Path> for TempDir {
fn as_ref(&self) -> &Path {
&self.0
}
}
#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))]
#[cfg(all(not(target_family = "wasm")))]
impl Drop for TempDir {
fn drop(&mut self) {
if self.0.is_dir() {

Loading…
Cancel
Save