diff --git a/Cargo.lock b/Cargo.lock index 65925ffe..9db0c94e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -629,13 +629,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c2a198fb6b0eada2a8df47933734e6d35d350665a33a3593d7164fa52c75c19" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.4", ] [[package]] name = "librocksdb-sys" version = "0.11.0+8.3.2" -source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#cedbf494b4ec11638f1e0b7446731e0b73573352" +source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#9a1595d5cbf1d1d8a9b94346543a8ddae2bee0be" dependencies = [ "bindgen", "bzip2-sys", @@ -1118,7 +1118,7 @@ checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" [[package]] name = "rocksdb" version = "0.21.0" -source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#cedbf494b4ec11638f1e0b7446731e0b73573352" +source = "git+https://git.nextgraph.org/NextGraph/rust-rocksdb.git?branch=master#9a1595d5cbf1d1d8a9b94346543a8ddae2bee0be" dependencies = [ "libc", "librocksdb-sys", diff --git a/Cargo.toml b/Cargo.toml index 6ce00d3b..4e8ea772 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ edition = "2021" rust-version = "1.70" [workspace.dependencies] +rocksdb = {git = "https://git.nextgraph.org/NextGraph/rust-rocksdb.git", branch = "master", features = [ ] } anyhow = "1.0.72" arbitrary = "1.3" assert_cmd = "2.0" diff --git a/lib/oxigraph/Cargo.toml b/lib/oxigraph/Cargo.toml index 995cf1c4..52caf883 100644 --- a/lib/oxigraph/Cargo.toml +++ b/lib/oxigraph/Cargo.toml @@ -41,7 +41,7 @@ thiserror.workspace = true [target.'cfg(not(target_family = "wasm"))'.dependencies] libc = "0.2" -rocksdb = {git = "https://git.nextgraph.org/NextGraph/rust-rocksdb.git", branch = "master", features = [ ] } +rocksdb.workspace = true [target.'cfg(all(target_family = "wasm", target_os = "unknown"))'.dependencies] getrandom.workspace = true diff --git a/lib/oxigraph/src/storage/backend/oxi_rocksdb.rs b/lib/oxigraph/src/storage/backend/oxi_rocksdb.rs index 5c5b96fd..d0a39b75 100644 --- a/lib/oxigraph/src/storage/backend/oxi_rocksdb.rs +++ b/lib/oxigraph/src/storage/backend/oxi_rocksdb.rs @@ -155,13 +155,17 @@ impl Drop for RoDbHandler { } impl Db { - pub fn new(column_families: Vec) -> Result { - Self::open_read_write(None, column_families) + pub fn new( + column_families: Vec, + key: Option<[u8; 32]>, + ) -> Result { + Self::open_read_write(None, column_families, key) } pub fn open_read_write( path: Option<&Path>, column_families: Vec, + key: Option<[u8; 32]>, ) -> Result { let (path, in_memory) = if let Some(path) = path { (path.to_path_buf(), false) @@ -170,7 +174,7 @@ impl Db { }; let c_path = path_to_cstring(&path)?; unsafe { - let options = Self::db_options(true, in_memory)?; + let options = Self::db_options(true, in_memory, key)?; rocksdb_options_set_create_if_missing(options, 1); rocksdb_options_set_create_missing_column_families(options, 1); rocksdb_options_set_compression( @@ -312,82 +316,83 @@ impl Db { } } - pub fn open_secondary( - primary_path: &Path, - secondary_path: Option<&Path>, - column_families: Vec, - ) -> Result { - let c_primary_path = path_to_cstring(primary_path)?; - let (secondary_path, in_memory) = if let Some(path) = secondary_path { - (path.to_path_buf(), false) - } else { - (tmp_path(), true) - }; - let c_secondary_path = path_to_cstring(&secondary_path)?; - unsafe { - let options = Self::db_options(false, false)?; - let (column_family_names, c_column_family_names, cf_options) = - Self::column_families_names_and_options(column_families, options); - let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = - vec![ptr::null_mut(); column_family_names.len()]; - let c_num_column_families = c_column_family_names.len().try_into().unwrap(); - let db = ffi_result!(rocksdb_open_as_secondary_column_families_with_status( - options, - c_primary_path.as_ptr(), - c_secondary_path.as_ptr(), - c_num_column_families, - c_column_family_names - .iter() - .map(|cf| cf.as_ptr()) - .collect::>() - .as_ptr(), - cf_options.as_ptr().cast(), - cf_handles.as_mut_ptr(), - )) - .map_err(|e| { - for cf_option in &cf_options { - rocksdb_options_destroy(*cf_option); - } - rocksdb_options_destroy(options); - e - })?; - assert!( - !db.is_null(), - "rocksdb_open_for_read_only_column_families_with_status returned null" - ); - for handle in &cf_handles { - assert!( - !handle.is_null(), - "rocksdb_open_for_read_only_column_families_with_status returned a null column family" - ); - } - let read_options = rocksdb_readoptions_create(); - assert!( - !read_options.is_null(), - "rocksdb_readoptions_create returned null" - ); - Ok(Self { - inner: DbKind::ReadOnly(Arc::new(RoDbHandler { - db, - options, - read_options, - column_family_names, - cf_handles, - cf_options, - is_secondary: true, - path_to_remove: in_memory.then_some(secondary_path), - })), - }) - } - } + // pub fn open_secondary( + // primary_path: &Path, + // secondary_path: Option<&Path>, + // column_families: Vec, + // ) -> Result { + // let c_primary_path = path_to_cstring(primary_path)?; + // let (secondary_path, in_memory) = if let Some(path) = secondary_path { + // (path.to_path_buf(), false) + // } else { + // (tmp_path(), true) + // }; + // let c_secondary_path = path_to_cstring(&secondary_path)?; + // unsafe { + // let options = Self::db_options(false, false)?; + // let (column_family_names, c_column_family_names, cf_options) = + // Self::column_families_names_and_options(column_families, options); + // let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = + // vec![ptr::null_mut(); column_family_names.len()]; + // let c_num_column_families = c_column_family_names.len().try_into().unwrap(); + // let db = ffi_result!(rocksdb_open_as_secondary_column_families_with_status( + // options, + // c_primary_path.as_ptr(), + // c_secondary_path.as_ptr(), + // c_num_column_families, + // c_column_family_names + // .iter() + // .map(|cf| cf.as_ptr()) + // .collect::>() + // .as_ptr(), + // cf_options.as_ptr().cast(), + // cf_handles.as_mut_ptr(), + // )) + // .map_err(|e| { + // for cf_option in &cf_options { + // rocksdb_options_destroy(*cf_option); + // } + // rocksdb_options_destroy(options); + // e + // })?; + // assert!( + // !db.is_null(), + // "rocksdb_open_for_read_only_column_families_with_status returned null" + // ); + // for handle in &cf_handles { + // assert!( + // !handle.is_null(), + // "rocksdb_open_for_read_only_column_families_with_status returned a null column family" + // ); + // } + // let read_options = rocksdb_readoptions_create(); + // assert!( + // !read_options.is_null(), + // "rocksdb_readoptions_create returned null" + // ); + // Ok(Self { + // inner: DbKind::ReadOnly(Arc::new(RoDbHandler { + // db, + // options, + // read_options, + // column_family_names, + // cf_handles, + // cf_options, + // is_secondary: true, + // path_to_remove: in_memory.then_some(secondary_path), + // })), + // }) + // } + // } pub fn open_read_only( path: &Path, column_families: Vec, + key: Option<[u8; 32]>, ) -> Result { unsafe { let c_path = path_to_cstring(path)?; - let options = Self::db_options(true, false)?; + let options = Self::db_options(true, false, key)?; let (column_family_names, c_column_family_names, cf_options) = Self::column_families_names_and_options(column_families, options); let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = @@ -495,7 +500,10 @@ impl Db { }) } else { ROCKSDB_ENV.get_or_init(|| { - let env = rocksdb_create_encrypted_env(opt_bytes_to_ptr(key.as_ref())); + let env = match key { + Some(_) => rocksdb_create_encrypted_env(opt_bytes_to_ptr(key.as_ref())), + None => rocksdb_create_default_env(), + }; assert!(!env.is_null(), "rocksdb_create_encrypted_env returned null"); UnsafeEnv(env) }) diff --git a/lib/oxigraph/src/storage/binary_encoder.rs b/lib/oxigraph/src/storage/binary_encoder.rs index dd853a08..1e789b7a 100644 --- a/lib/oxigraph/src/storage/binary_encoder.rs +++ b/lib/oxigraph/src/storage/binary_encoder.rs @@ -5,7 +5,7 @@ use oxsdatatypes::*; use std::io::Read; use std::mem::size_of; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] pub const LATEST_STORAGE_VERSION: u64 = 1; pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::() + 2 * size_of::(); diff --git a/lib/oxigraph/src/storage/mod.rs b/lib/oxigraph/src/storage/mod.rs index 2d0e5c31..e4215f63 100644 --- a/lib/oxigraph/src/storage/mod.rs +++ b/lib/oxigraph/src/storage/mod.rs @@ -1,9 +1,9 @@ #![allow(clippy::same_name_method)] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use crate::model::Quad; use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef, TermRef}; use crate::storage::backend::{Reader, Transaction}; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use crate::storage::binary_encoder::LATEST_STORAGE_VERSION; use crate::storage::binary_encoder::{ decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple, @@ -12,22 +12,22 @@ use crate::storage::binary_encoder::{ WRITTEN_TERM_MAX_SIZE, }; pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, StorageError}; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use crate::storage::numeric_encoder::Decoder; use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup}; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::collections::VecDeque; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::collections::{HashMap, HashSet}; use std::error::Error; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::mem::{swap, take}; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::path::{Path, PathBuf}; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::sync::Mutex; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::{io, thread}; mod backend; @@ -47,16 +47,16 @@ const DSPO_CF: &str = "dspo"; const DPOS_CF: &str = "dpos"; const DOSP_CF: &str = "dosp"; const GRAPHS_CF: &str = "graphs"; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] const DEFAULT_CF: &str = "default"; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000; /// Low level storage primitives #[derive(Clone)] pub struct Storage { db: Db, - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] default_cf: ColumnFamily, id2str_cf: ColumnFamily, spog_cf: ColumnFamily, @@ -72,39 +72,43 @@ pub struct Storage { } impl Storage { - pub fn new() -> Result { - Self::setup(Db::new(Self::column_families())?) + pub fn new(key: Option<[u8; 32]>) -> Result { + Self::setup(Db::new(Self::column_families(), key)?) } - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] - pub fn open(path: &Path) -> Result { - Self::setup(Db::open_read_write(Some(path), Self::column_families())?) - } - - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] - pub fn open_secondary(primary_path: &Path) -> Result { - Self::setup(Db::open_secondary( - primary_path, - None, - Self::column_families(), - )?) - } - - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] - pub fn open_persistent_secondary( - primary_path: &Path, - secondary_path: &Path, - ) -> Result { - Self::setup(Db::open_secondary( - primary_path, - Some(secondary_path), + #[cfg(all(not(target_family = "wasm")))] + pub fn open(path: &Path, key: Option<[u8; 32]>) -> Result { + Self::setup(Db::open_read_write( + Some(path), Self::column_families(), + key, )?) } - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] - pub fn open_read_only(path: &Path) -> Result { - Self::setup(Db::open_read_only(path, Self::column_families())?) + // #[cfg(all(not(target_family = "wasm")))] + // pub fn open_secondary(primary_path: &Path) -> Result { + // Self::setup(Db::open_secondary( + // primary_path, + // None, + // Self::column_families(), + // )?) + // } + + // #[cfg(all(not(target_family = "wasm")))] + // pub fn open_persistent_secondary( + // primary_path: &Path, + // secondary_path: &Path, + // ) -> Result { + // Self::setup(Db::open_secondary( + // primary_path, + // Some(secondary_path), + // Self::column_families(), + // )?) + // } + + #[cfg(all(not(target_family = "wasm")))] + pub fn open_read_only(path: &Path, key: Option<[u8; 32]>) -> Result { + Self::setup(Db::open_read_only(path, Self::column_families(), key)?) } fn column_families() -> Vec { @@ -180,7 +184,7 @@ impl Storage { fn setup(db: Db) -> Result { let this = Self { - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] default_cf: db.column_family(DEFAULT_CF)?, id2str_cf: db.column_family(ID2STR_CF)?, spog_cf: db.column_family(SPOG_CF)?, @@ -195,12 +199,12 @@ impl Storage { graphs_cf: db.column_family(GRAPHS_CF)?, db, }; - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] this.migrate()?; Ok(this) } - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] fn migrate(&self) -> Result<(), StorageError> { let mut version = self.ensure_version()?; if version == 0 { @@ -240,7 +244,7 @@ impl Storage { } } - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] fn ensure_version(&self) -> Result { Ok( if let Some(version) = self.db.get(&self.default_cf, b"oxversion")? { @@ -254,7 +258,7 @@ impl Storage { ) } - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] fn update_version(&self, version: u64) -> Result<(), StorageError> { self.db .insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?; @@ -281,12 +285,12 @@ impl Storage { }) } - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn flush(&self) -> Result<(), StorageError> { self.db.flush() } - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn compact(&self) -> Result<(), StorageError> { self.db.compact(&self.default_cf)?; self.db.compact(&self.gspo_cf)?; @@ -301,7 +305,7 @@ impl Storage { self.db.compact(&self.id2str_cf) } - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> { self.db.backup(target_directory) } @@ -626,7 +630,7 @@ impl StorageReader { } } - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn get_str(&self, key: &StrHash) -> Result, StorageError> { Ok(self .storage @@ -647,7 +651,7 @@ impl StorageReader { .map_err(CorruptionError::new)?) } - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn contains_str(&self, key: &StrHash) -> Result { self.storage .db @@ -661,7 +665,7 @@ impl StorageReader { } /// Validates that all the storage invariants held in the data - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn validate(&self) -> Result<(), StorageError> { // triples let dspo_size = self.dspo_quads(&[]).count(); @@ -997,7 +1001,7 @@ impl<'a> StorageWriter<'a> { } } - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> { if self .storage @@ -1178,7 +1182,7 @@ impl<'a> StorageWriter<'a> { } } -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] #[must_use] pub struct StorageBulkLoader { storage: Storage, @@ -1187,7 +1191,7 @@ pub struct StorageBulkLoader { max_memory_size: Option, } -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] impl StorageBulkLoader { pub fn new(storage: Storage) -> Self { Self { @@ -1318,7 +1322,7 @@ impl StorageBulkLoader { } } -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] struct FileBulkLoader<'a> { storage: &'a Storage, id2str: HashMap>, @@ -1327,7 +1331,7 @@ struct FileBulkLoader<'a> { graphs: HashSet, } -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] impl<'a> FileBulkLoader<'a> { fn new(storage: &'a Storage, batch_size: usize) -> Self { Self { @@ -1533,7 +1537,7 @@ impl<'a> FileBulkLoader<'a> { } } -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] fn map_thread_result(result: thread::Result) -> io::Result { result.map_err(|e| { io::Error::new( diff --git a/lib/oxigraph/src/store.rs b/lib/oxigraph/src/store.rs index fe857345..1b09e5bb 100644 --- a/lib/oxigraph/src/store.rs +++ b/lib/oxigraph/src/store.rs @@ -25,7 +25,7 @@ //! }; //! # Result::<_, Box>::Ok(()) //! ``` -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use crate::io::RdfParseError; use crate::io::{RdfFormat, RdfParser, RdfSerializer}; use crate::model::*; @@ -34,7 +34,7 @@ use crate::sparql::{ QueryResults, Update, UpdateOptions, }; use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm}; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use crate::storage::StorageBulkLoader; use crate::storage::{ ChainedDecodingQuadIterator, DecodingGraphIterator, Storage, StorageReader, StorageWriter, @@ -42,7 +42,7 @@ use crate::storage::{ pub use crate::storage::{CorruptionError, LoaderError, SerializerError, StorageError}; use std::error::Error; use std::io::{Read, Write}; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::path::Path; use std::{fmt, str}; @@ -91,7 +91,13 @@ impl Store { /// Creates a temporary [`Store`] that will be deleted after drop. pub fn new() -> Result { Ok(Self { - storage: Storage::new()?, + storage: Storage::new(None)?, + }) + } + + pub fn new_with_key(key: [u8; 32]) -> Result { + Ok(Self { + storage: Storage::new(Some(key))?, }) } @@ -100,30 +106,37 @@ impl Store { /// Only one read-write [`Store`] can exist at the same time. /// If you want to have extra [`Store`] instance opened on a same data /// use [`Store::open_secondary`] or [`Store::open_read_only`]. - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn open(path: impl AsRef) -> Result { Ok(Self { - storage: Storage::open(path.as_ref())?, + storage: Storage::open(path.as_ref(), None)?, }) } - /// Opens a read-only clone of a running read-write [`Store`]. - /// - /// Changes done while this process is running will be replicated after a possible lag. - /// - /// It should only be used if a primary instance opened with [`Store::open`] is running at the same time. - /// `primary_path` must be the path of the primary instance. - /// This secondary instance will use temporary storage for the secondary instance cache. - /// If you prefer persistent storage use [`Store::open_persistent_secondary`]. - /// - /// If you want to simple read-only [`Store`] use [`Store::open_read_only`]. - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] - pub fn open_secondary(primary_path: impl AsRef) -> Result { + #[cfg(all(not(target_family = "wasm")))] + pub fn open_with_key(path: impl AsRef, key: [u8; 32]) -> Result { Ok(Self { - storage: Storage::open_secondary(primary_path.as_ref())?, + storage: Storage::open(path.as_ref(), Some(key))?, }) } + // /// Opens a read-only clone of a running read-write [`Store`]. + // /// + // /// Changes done while this process is running will be replicated after a possible lag. + // /// + // /// It should only be used if a primary instance opened with [`Store::open`] is running at the same time. + // /// `primary_path` must be the path of the primary instance. + // /// This secondary instance will use temporary storage for the secondary instance cache. + // /// If you prefer persistent storage use [`Store::open_persistent_secondary`]. + // /// + // /// If you want to simple read-only [`Store`] use [`Store::open_read_only`]. + // #[cfg(all(not(target_family = "wasm")))] + // pub fn open_secondary(primary_path: impl AsRef) -> Result { + // Ok(Self { + // storage: Storage::open_secondary(primary_path.as_ref())?, + // }) + // } + /// Opens a read-only clone of a running read-write [`Store`] with persistence of the secondary instance cache. /// /// Changes done while this process is running will be replicated after a possible lag. @@ -132,7 +145,7 @@ impl Store { /// `primary_path` must be the path of the primary instance and `secondary_path` an other directory for the secondary instance cache. /// /// If you want to simple read-only [`Store`] use [`Store::open_read_only`]. - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn open_persistent_secondary( primary_path: impl AsRef, secondary_path: impl AsRef, @@ -149,10 +162,13 @@ impl Store { /// /// Opening as read-only while having an other process writing the database is undefined behavior. /// [`Store::open_secondary`] should be used in this case. - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] - pub fn open_read_only(path: impl AsRef) -> Result { + #[cfg(all(not(target_family = "wasm")))] + pub fn open_read_only( + path: impl AsRef, + key: Option<[u8; 32]>, + ) -> Result { Ok(Self { - storage: Storage::open_read_only(path.as_ref())?, + storage: Storage::open_read_only(path.as_ref(), key)?, }) } @@ -930,7 +946,7 @@ impl Store { /// Flushes all buffers and ensures that all writes are saved on disk. /// /// Flushes are automatically done using background threads but might lag a little bit. - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn flush(&self) -> Result<(), StorageError> { self.storage.flush() } @@ -940,7 +956,7 @@ impl Store { /// Useful to call after a batch upload or another similar operation. /// ///
Can take hours on huge databases.
- #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn optimize(&self) -> Result<(), StorageError> { self.storage.compact() } @@ -963,7 +979,7 @@ impl Store { /// This allows cheap regular backups. /// /// If you want to move your data to another RDF storage system, you should have a look at the [`Store::dump_to_write`] function instead. - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn backup(&self, target_directory: impl AsRef) -> Result<(), StorageError> { self.storage.backup(target_directory.as_ref()) } @@ -990,7 +1006,7 @@ impl Store { /// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?); /// # Result::<_, Box>::Ok(()) /// ``` - #[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] + #[cfg(all(not(target_family = "wasm")))] pub fn bulk_loader(&self) -> BulkLoader { BulkLoader { storage: StorageBulkLoader::new(self.storage.clone()), @@ -1608,14 +1624,14 @@ impl Iterator for GraphNameIter { /// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?); /// # Result::<_, Box>::Ok(()) /// ``` -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] #[must_use] pub struct BulkLoader { storage: StorageBulkLoader, on_parse_error: Option Result<(), RdfParseError>>>, } -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] impl BulkLoader { /// Sets the maximal number of threads to be used by the bulk loader per operation. /// diff --git a/lib/oxigraph/tests/store.rs b/lib/oxigraph/tests/store.rs index 89e69032..9ba1db56 100644 --- a/lib/oxigraph/tests/store.rs +++ b/lib/oxigraph/tests/store.rs @@ -5,22 +5,22 @@ use oxigraph::io::RdfFormat; use oxigraph::model::vocab::{rdf, xsd}; use oxigraph::model::*; use oxigraph::store::Store; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use rand::random; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::env::temp_dir; use std::error::Error; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::fs::{create_dir_all, remove_dir_all, File}; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::io::Write; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::iter::empty; -#[cfg(all(target_os = "linux", feature = "rocksdb"))] +#[cfg(all(target_os = "linux"))] use std::iter::once; -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] use std::path::{Path, PathBuf}; -#[cfg(all(target_os = "linux", feature = "rocksdb"))] +#[cfg(all(target_os = "linux"))] use std::process::Command; #[allow(clippy::non_ascii_literal)] @@ -121,7 +121,7 @@ fn test_load_graph() -> Result<(), Box> { } #[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] fn test_bulk_load_graph() -> Result<(), Box> { let store = Store::new()?; store @@ -135,7 +135,7 @@ fn test_bulk_load_graph() -> Result<(), Box> { } #[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] fn test_bulk_load_graph_lenient() -> Result<(), Box> { let store = Store::new()?; store.bulk_loader().on_parse_error(|_| Ok(())).load_from_read( @@ -154,7 +154,7 @@ fn test_bulk_load_graph_lenient() -> Result<(), Box> { } #[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] fn test_bulk_load_empty() -> Result<(), Box> { let store = Store::new()?; store.bulk_loader().load_quads(empty::())?; @@ -177,7 +177,7 @@ fn test_load_dataset() -> Result<(), Box> { } #[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] fn test_bulk_load_dataset() -> Result<(), Box> { let store = Store::new()?; store @@ -258,7 +258,7 @@ fn test_snapshot_isolation_iterator() -> Result<(), Box> { } #[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box> { let quad = QuadRef::new( NamedNodeRef::new_unchecked("http://example.com/s"), @@ -274,7 +274,7 @@ fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<(), Box Result<(), Box> { let dir = TempDir::default(); create_dir_all(&dir.0)?; @@ -286,7 +286,7 @@ fn test_open_bad_dir() -> Result<(), Box> { } #[test] -#[cfg(all(target_os = "linux", feature = "rocksdb"))] +#[cfg(all(target_os = "linux"))] fn test_bad_stt_open() -> Result<(), Box> { let dir = TempDir::default(); let store = Store::open(&dir.0)?; @@ -303,48 +303,48 @@ fn test_bad_stt_open() -> Result<(), Box> { Ok(()) } -#[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] -fn test_backup() -> Result<(), Box> { - let quad = QuadRef::new( - NamedNodeRef::new_unchecked("http://example.com/s"), - NamedNodeRef::new_unchecked("http://example.com/p"), - NamedNodeRef::new_unchecked("http://example.com/o"), - GraphNameRef::DefaultGraph, - ); - let store_dir = TempDir::default(); - let backup_from_rw_dir = TempDir::default(); - let backup_from_ro_dir = TempDir::default(); - let backup_from_secondary_dir = TempDir::default(); - - let store = Store::open(&store_dir)?; - store.insert(quad)?; - let secondary_store = Store::open_secondary(&store_dir)?; - store.flush()?; - - store.backup(&backup_from_rw_dir)?; - secondary_store.backup(&backup_from_secondary_dir)?; - store.remove(quad)?; - assert!(!store.contains(quad)?); - - let backup_from_rw = Store::open_read_only(&backup_from_rw_dir.0)?; - backup_from_rw.validate()?; - assert!(backup_from_rw.contains(quad)?); - backup_from_rw.backup(&backup_from_ro_dir)?; - - let backup_from_ro = Store::open_read_only(&backup_from_ro_dir.0)?; - backup_from_ro.validate()?; - assert!(backup_from_ro.contains(quad)?); - - let backup_from_secondary = Store::open_read_only(&backup_from_secondary_dir.0)?; - backup_from_secondary.validate()?; - assert!(backup_from_secondary.contains(quad)?); - - Ok(()) -} +// #[test] +// #[cfg(all(not(target_family = "wasm")))] +// fn test_backup() -> Result<(), Box> { +// let quad = QuadRef::new( +// NamedNodeRef::new_unchecked("http://example.com/s"), +// NamedNodeRef::new_unchecked("http://example.com/p"), +// NamedNodeRef::new_unchecked("http://example.com/o"), +// GraphNameRef::DefaultGraph, +// ); +// let store_dir = TempDir::default(); +// let backup_from_rw_dir = TempDir::default(); +// let backup_from_ro_dir = TempDir::default(); +// let backup_from_secondary_dir = TempDir::default(); + +// let store = Store::open(&store_dir)?; +// store.insert(quad)?; +// let secondary_store = Store::open_secondary(&store_dir)?; +// store.flush()?; + +// store.backup(&backup_from_rw_dir)?; +// secondary_store.backup(&backup_from_secondary_dir)?; +// store.remove(quad)?; +// assert!(!store.contains(quad)?); + +// let backup_from_rw = Store::open_read_only(&backup_from_rw_dir.0)?; +// backup_from_rw.validate()?; +// assert!(backup_from_rw.contains(quad)?); +// backup_from_rw.backup(&backup_from_ro_dir)?; + +// let backup_from_ro = Store::open_read_only(&backup_from_ro_dir.0)?; +// backup_from_ro.validate()?; +// assert!(backup_from_ro.contains(quad)?); + +// let backup_from_secondary = Store::open_read_only(&backup_from_secondary_dir.0)?; +// backup_from_secondary.validate()?; +// assert!(backup_from_secondary.contains(quad)?); + +// Ok(()) +// } #[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] fn test_bad_backup() -> Result<(), Box> { let store_dir = TempDir::default(); let backup_dir = TempDir::default(); @@ -355,7 +355,7 @@ fn test_bad_backup() -> Result<(), Box> { } #[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] fn test_backup_on_in_memory() -> Result<(), Box> { let backup_dir = TempDir::default(); Store::new()?.backup(&backup_dir).unwrap_err(); @@ -363,7 +363,7 @@ fn test_backup_on_in_memory() -> Result<(), Box> { } #[test] -#[cfg(all(target_os = "linux", feature = "rocksdb"))] +#[cfg(all(target_os = "linux"))] fn test_backward_compatibility() -> Result<(), Box> { // We run twice to check if data is properly saved and closed for _ in 0..2 { @@ -386,63 +386,63 @@ fn test_backward_compatibility() -> Result<(), Box> { Ok(()) } -#[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] -fn test_secondary() -> Result<(), Box> { - let quad = QuadRef::new( - NamedNodeRef::new_unchecked("http://example.com/s"), - NamedNodeRef::new_unchecked("http://example.com/p"), - NamedNodeRef::new_unchecked("http://example.com/o"), - GraphNameRef::DefaultGraph, - ); - let primary_dir = TempDir::default(); - - // We open the store - let primary = Store::open(&primary_dir)?; - let secondary = Store::open_secondary(&primary_dir)?; - - // We insert a quad - primary.insert(quad)?; - primary.flush()?; - - // It is readable from both stores - for store in &[&primary, &secondary] { - assert!(store.contains(quad)?); - assert_eq!( - store.iter().collect::, _>>()?, - vec![quad.into_owned()] - ); - } - - // We validate the states - primary.validate()?; - secondary.validate()?; - - // We close the primary store and remove its content - drop(primary); - remove_dir_all(&primary_dir)?; - - // We secondary store is still readable - assert!(secondary.contains(quad)?); - secondary.validate()?; - - Ok(()) -} - -#[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] -fn test_open_secondary_bad_dir() -> Result<(), Box> { - let primary_dir = TempDir::default(); - create_dir_all(&primary_dir.0)?; - { - File::create(primary_dir.0.join("CURRENT"))?.write_all(b"foo")?; - } - assert!(Store::open_secondary(&primary_dir).is_err()); - Ok(()) -} +// #[test] +// #[cfg(all(not(target_family = "wasm")))] +// fn test_secondary() -> Result<(), Box> { +// let quad = QuadRef::new( +// NamedNodeRef::new_unchecked("http://example.com/s"), +// NamedNodeRef::new_unchecked("http://example.com/p"), +// NamedNodeRef::new_unchecked("http://example.com/o"), +// GraphNameRef::DefaultGraph, +// ); +// let primary_dir = TempDir::default(); + +// // We open the store +// let primary = Store::open(&primary_dir)?; +// let secondary = Store::open_secondary(&primary_dir)?; + +// // We insert a quad +// primary.insert(quad)?; +// primary.flush()?; + +// // It is readable from both stores +// for store in &[&primary, &secondary] { +// assert!(store.contains(quad)?); +// assert_eq!( +// store.iter().collect::, _>>()?, +// vec![quad.into_owned()] +// ); +// } + +// // We validate the states +// primary.validate()?; +// secondary.validate()?; + +// // We close the primary store and remove its content +// drop(primary); +// remove_dir_all(&primary_dir)?; + +// // We secondary store is still readable +// assert!(secondary.contains(quad)?); +// secondary.validate()?; + +// Ok(()) +// } + +// #[test] +// #[cfg(all(not(target_family = "wasm")))] +// fn test_open_secondary_bad_dir() -> Result<(), Box> { +// let primary_dir = TempDir::default(); +// create_dir_all(&primary_dir.0)?; +// { +// File::create(primary_dir.0.join("CURRENT"))?.write_all(b"foo")?; +// } +// assert!(Store::open_secondary(&primary_dir).is_err()); +// Ok(()) +// } #[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] fn test_read_only() -> Result<(), Box> { let s = NamedNodeRef::new_unchecked("http://example.com/s"); let p = NamedNodeRef::new_unchecked("http://example.com/p"); @@ -468,7 +468,7 @@ fn test_read_only() -> Result<(), Box> { } // We open as read-only - let read_only = Store::open_read_only(&store_dir)?; + let read_only = Store::open_read_only(&store_dir, None)?; assert!(read_only.contains(first_quad)?); assert_eq!( read_only.iter().collect::, _>>()?, @@ -491,18 +491,18 @@ fn test_read_only() -> Result<(), Box> { } #[test] -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] fn test_open_read_only_bad_dir() -> Result<(), Box> { let dir = TempDir::default(); create_dir_all(&dir.0)?; { File::create(dir.0.join("CURRENT"))?.write_all(b"foo")?; } - assert!(Store::open_read_only(&dir).is_err()); + assert!(Store::open_read_only(&dir, None).is_err()); Ok(()) } -#[cfg(all(target_os = "linux", feature = "rocksdb"))] +#[cfg(all(target_os = "linux"))] fn reset_dir(dir: &str) -> Result<(), Box> { assert!(Command::new("git") .args(["clean", "-fX", dir]) @@ -515,24 +515,24 @@ fn reset_dir(dir: &str) -> Result<(), Box> { Ok(()) } -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] struct TempDir(PathBuf); -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] impl Default for TempDir { fn default() -> Self { Self(temp_dir().join(format!("oxigraph-test-{}", random::()))) } } -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] impl AsRef for TempDir { fn as_ref(&self) -> &Path { &self.0 } } -#[cfg(all(not(target_family = "wasm"), feature = "rocksdb"))] +#[cfg(all(not(target_family = "wasm")))] impl Drop for TempDir { fn drop(&mut self) { if self.0.is_dir() {