diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index 706c5f85..d74e59bd 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -3,7 +3,7 @@ #![allow(unsafe_code, trivial_casts)] use crate::storage::error::StorageError; -use crate::store::CorruptionError; +use crate::store::{CorruptionError, StoreOpenOptions}; use lazy_static::lazy_static; use libc::{self, c_char, c_void, free}; use oxrocksdb_sys::*; @@ -25,6 +25,13 @@ use std::sync::Arc; use std::thread::{available_parallelism, yield_now}; use std::{ptr, slice}; +#[derive(PartialEq, Eq)] +enum OpeningMode { + InMemory, + Primary, + Secondary(PathBuf), +} + macro_rules! ffi_result { ( $($function:ident)::*() ) => { ffi_result_impl!($($function)::*()) @@ -69,6 +76,28 @@ lazy_static! { }; } +fn primary_db_handler_or_error( + db: &InnerDbHandler, +) -> Result<&InnerDbHandlerPrimary, StorageError> { + match db { + InnerDbHandler::Primary(inner_db) | InnerDbHandler::InMemory(inner_db) => Ok(inner_db), + _ => Err(StorageError::Other( + "Database is opened as secondary, can not execute this operation.".into(), + )), + } +} + +fn secondary_db_handler_or_error( + db: &InnerDbHandler, +) -> Result<&InnerDbHandlerSecondary, StorageError> { + match db { + InnerDbHandler::Secondary(inner_db) => Ok(inner_db), + _ => Err(StorageError::Other(Box::::from( + "Expected secondary InnerDbHandler, got primary/in memory one", + ))), + } +} + pub struct ColumnFamilyDefinition { pub name: &'static str, pub use_iter: bool, @@ -84,11 +113,44 @@ unsafe impl Send for Db {} unsafe impl Sync for Db {} -struct DbHandler { +/// Handler that has read only access to the database +struct InnerDbHandlerSecondary { + db: *mut rocksdb_t, + primary_path: PathBuf, +} + +/// Handler that has read and write access to the database +struct InnerDbHandlerPrimary { db: *mut rocksdb_transactiondb_t, - options: *mut rocksdb_options_t, transaction_options: *mut rocksdb_transaction_options_t, transactiondb_options: *mut rocksdb_transactiondb_options_t, + path: PathBuf, +} + +enum InnerDbHandler { + Primary(InnerDbHandlerPrimary), + InMemory(InnerDbHandlerPrimary), + Secondary(InnerDbHandlerSecondary), +} + +impl InnerDbHandler { + fn is_null(&self) -> bool { + match self { + Self::Primary(s) | Self::InMemory(s) => s.db.is_null(), + Self::Secondary(s) => s.db.is_null(), + } + } + fn primary_path(&self) -> &PathBuf { + match self { + Self::Primary(s) | Self::InMemory(s) => &s.path, + Self::Secondary(s) => &s.primary_path, + } + } +} + +struct DbHandler { + db: InnerDbHandler, + options: *mut rocksdb_options_t, read_options: *mut rocksdb_readoptions_t, write_options: *mut rocksdb_writeoptions_t, flush_options: *mut rocksdb_flushoptions_t, @@ -99,8 +161,6 @@ struct DbHandler { column_family_names: Vec<&'static str>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_options: Vec<*mut rocksdb_options_t>, - path: PathBuf, - in_memory: bool, } impl Drop for DbHandler { @@ -109,7 +169,14 @@ impl Drop for DbHandler { for cf_handle in &self.cf_handles { rocksdb_column_family_handle_destroy(*cf_handle); } - rocksdb_transactiondb_close(self.db); + match &self.db { + InnerDbHandler::Primary(s) | InnerDbHandler::InMemory(s) => { + rocksdb_transactiondb_close(s.db); + } + InnerDbHandler::Secondary(s) => { + rocksdb_close(s.db); + } + } for cf_option in &self.cf_options { rocksdb_options_destroy(*cf_option); } @@ -119,13 +186,19 @@ impl Drop for DbHandler { rocksdb_envoptions_destroy(self.env_options); rocksdb_ingestexternalfileoptions_destroy(self.ingest_external_file_options); rocksdb_compactoptions_destroy(self.compaction_options); - rocksdb_transaction_options_destroy(self.transaction_options); - rocksdb_transactiondb_options_destroy(self.transactiondb_options); + if let InnerDbHandler::Primary(inner_db) | InnerDbHandler::InMemory(inner_db) = &self.db + { + rocksdb_transaction_options_destroy(inner_db.transaction_options); + rocksdb_transactiondb_options_destroy(inner_db.transactiondb_options); + } rocksdb_options_destroy(self.options); rocksdb_block_based_options_destroy(self.block_based_table_options); } - if self.in_memory && self.path.exists() { - remove_dir_all(&self.path).unwrap(); + + if let InnerDbHandler::InMemory(db) = &self.db { + if db.path.exists() { + remove_dir_all(&db.path).unwrap(); + } } } } @@ -138,7 +211,11 @@ impl Db { temp_dir() } .join(format!("oxigraph-rocksdb-{}", random::())); - Ok(Self(Arc::new(Self::do_open(path, column_families, true)?))) + Ok(Self(Arc::new(Self::do_open( + path, + column_families, + &OpeningMode::InMemory, + )?))) } pub fn open( @@ -148,17 +225,29 @@ impl Db { Ok(Self(Arc::new(Self::do_open( path.to_owned(), column_families, - false, + &OpeningMode::Primary, )?))) } + pub fn open_with_options( + options: StoreOpenOptions, + column_families: Vec, + ) -> Result { + match options { + StoreOpenOptions::OpenAsSecondary(options) => Ok(Self(Arc::new(Self::do_open( + options.path, + column_families, + &OpeningMode::Secondary(options.secondary_path), + )?))), + } + } + fn do_open( path: PathBuf, mut column_families: Vec, - in_memory: bool, + open_mode: &OpeningMode, ) -> Result { let c_path = path_to_cstring(&path)?; - unsafe { let options = rocksdb_options_create(); assert!(!options.is_null(), "rocksdb_options_create returned null"); @@ -170,28 +259,32 @@ impl Db { available_parallelism()?.get().try_into().unwrap(), ); if let Some(available_fd) = available_file_descriptors()? { - if available_fd < 96 { - rocksdb_options_destroy(options); - return Err(io::Error::new( - io::ErrorKind::Other, - format!( - "Oxigraph needs at least 96 file descriptors, only {available_fd} allowed. Run e.g. `ulimit -n 512` to allow 512 opened files" - - ), - ) - .into()); - } - rocksdb_options_set_max_open_files( - options, - (available_fd - 48).try_into().unwrap(), - ); + let max_open_files = match &open_mode { + OpeningMode::Primary | OpeningMode::InMemory => { + if available_fd < 96 { + rocksdb_options_destroy(options); + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Oxigraph needs at least 96 file descriptors, \ + only {available_fd} allowed. \ + Run e.g. `ulimit -n 512` to allow 512 opened files" + ), + ) + .into()); + } + (available_fd - 48).try_into().unwrap() + } + OpeningMode::Secondary(_) => -1, + }; + rocksdb_options_set_max_open_files(options, max_open_files); } rocksdb_options_set_info_log_level(options, 2); // We only log warnings rocksdb_options_set_max_log_file_size(options, 1024 * 1024); // Only 1MB log size rocksdb_options_set_recycle_log_file_num(options, 10); // We do not keep more than 10 log files rocksdb_options_set_compression( options, - if in_memory { + if open_mode == &OpeningMode::InMemory { rocksdb_no_compression } else { rocksdb_lz4_compression @@ -201,7 +294,7 @@ impl Db { ); rocksdb_options_set_env( options, - if in_memory { + if open_mode == &OpeningMode::InMemory { ROCKSDB_MEM_ENV.0 } else { ROCKSDB_ENV.0 @@ -219,12 +312,6 @@ impl Db { ); rocksdb_options_set_block_based_table_factory(options, block_based_table_options); - let transactiondb_options = rocksdb_transactiondb_options_create(); - assert!( - !transactiondb_options.is_null(), - "rocksdb_transactiondb_options_create returned null" - ); - if !column_families.iter().any(|c| c.name == "default") { column_families.push(ColumnFamilyDefinition { name: "default", @@ -261,24 +348,76 @@ impl Db { let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = vec![ptr::null_mut(); column_family_names.len()]; - let db = ffi_result!(rocksdb_transactiondb_open_column_families_with_status( - options, - transactiondb_options, - c_path.as_ptr(), - c_column_families.len().try_into().unwrap(), - c_column_families - .iter() - .map(|cf| cf.as_ptr()) - .collect::>() - .as_ptr(), - cf_options.as_ptr() as *const *const rocksdb_options_t, - cf_handles.as_mut_ptr(), - )) + + let c_num_column_families = c_column_families.len().try_into().unwrap(); + let c_column_family_names = c_column_families + .iter() + .map(|cf| cf.as_ptr()) + .collect::>(); + + let db = match &open_mode { + OpeningMode::Primary | OpeningMode::InMemory => { + let transactiondb_options = rocksdb_transactiondb_options_create(); + assert!( + !transactiondb_options.is_null(), + "rocksdb_transactiondb_options_create returned null" + ); + + ffi_result!(rocksdb_transactiondb_open_column_families_with_status( + options, + transactiondb_options, + c_path.as_ptr(), + c_num_column_families, + c_column_family_names.as_ptr(), + cf_options.as_ptr() as *const *const rocksdb_options_t, + cf_handles.as_mut_ptr(), + )) + .map(|db| { + let transaction_options = rocksdb_transaction_options_create(); + assert!( + !transaction_options.is_null(), + "rocksdb_transaction_options_create returned null" + ); + rocksdb_transaction_options_set_set_snapshot(transaction_options, 1); + let primary_db_handler = InnerDbHandlerPrimary { + db, + transaction_options, + transactiondb_options, + path, + }; + match open_mode { + OpeningMode::InMemory => InnerDbHandler::InMemory(primary_db_handler), + OpeningMode::Primary => InnerDbHandler::Primary(primary_db_handler), + _ => unreachable!(), + } + }) + .map_err(|e| { + rocksdb_transactiondb_options_destroy(transactiondb_options); + e + }) + } + OpeningMode::Secondary(secondary_path) => { + ffi_result!(rocksdb_open_as_secondary_column_families_with_status( + options, + c_path.as_ptr(), + path_to_cstring(secondary_path)?.as_ptr(), + c_num_column_families, + c_column_family_names.as_ptr(), + cf_options.as_ptr() as *const *const rocksdb_options_t, + cf_handles.as_mut_ptr(), + )) + .map(|db| { + InnerDbHandler::Secondary(InnerDbHandlerSecondary { + db, + primary_path: path, + }) + }) + } + } .map_err(|e| { for cf_option in &cf_options { rocksdb_options_destroy(*cf_option); } - rocksdb_transactiondb_options_destroy(transactiondb_options); rocksdb_options_destroy(options); rocksdb_block_based_options_destroy(block_based_table_options); e @@ -302,7 +441,7 @@ impl Db { !write_options.is_null(), "rocksdb_writeoptions_create returned null" ); - if in_memory { + if open_mode == &OpeningMode::InMemory { rocksdb_writeoptions_disable_WAL(write_options, 1); // No need for WAL } @@ -330,18 +469,9 @@ impl Db { "rocksdb_compactoptions_create returned null" ); - let transaction_options = rocksdb_transaction_options_create(); - assert!( - !transaction_options.is_null(), - "rocksdb_transaction_options_create returned null" - ); - rocksdb_transaction_options_set_set_snapshot(transaction_options, 1); - Ok(DbHandler { db, options, - transaction_options, - transactiondb_options, read_options, write_options, flush_options, @@ -352,8 +482,6 @@ impl Db { column_family_names, cf_handles, cf_options, - path, - in_memory, }) } } @@ -370,19 +498,35 @@ impl Db { #[must_use] pub fn snapshot(&self) -> Reader { unsafe { - let snapshot = rocksdb_transactiondb_create_snapshot(self.0.db); - assert!( - !snapshot.is_null(), - "rocksdb_transactiondb_create_snapshot returned null" - ); let options = rocksdb_readoptions_create_copy(self.0.read_options); - rocksdb_readoptions_set_snapshot(options, snapshot); - Reader { - inner: InnerReader::Snapshot(Rc::new(InnerSnapshot { - db: self.0.clone(), - snapshot, - })), - options, + match &self.0.db { + InnerDbHandler::InMemory(inner_db_handler) + | InnerDbHandler::Primary(inner_db_handler) => { + let snapshot = rocksdb_transactiondb_create_snapshot(inner_db_handler.db); + assert!( + !snapshot.is_null(), + "rocksdb_transactiondb_create_snapshot returned null" + ); + rocksdb_readoptions_set_snapshot(options, snapshot); + Reader { + inner: InnerReader::Snapshot(Rc::new(InnerSnapshot { + db: self.0.clone(), + snapshot, + })), + options, + } + } + InnerDbHandler::Secondary(_) => { + ffi_result!(rocksdb_try_catch_up_with_primary_with_status( + secondary_db_handler_or_error(&self.0.db).unwrap().db, + )) + .unwrap(); + + Reader { + inner: InnerReader::Secondary(self.0.clone()), + options, + } + } } } } @@ -393,10 +537,11 @@ impl Db { ) -> Result { loop { let transaction = unsafe { + let db = primary_db_handler_or_error(&self.0.db)?; let transaction = rocksdb_transaction_begin( - self.0.db, + db.db, self.0.write_options, - self.0.transaction_options, + db.transaction_options, ptr::null_mut(), ); assert!( @@ -464,13 +609,27 @@ impl Db { key: &[u8], ) -> Result, StorageError> { unsafe { - let slice = ffi_result!(rocksdb_transactiondb_get_pinned_cf_with_status( - self.0.db, - self.0.read_options, - column_family.0, - key.as_ptr() as *const c_char, - key.len() - ))?; + let slice = match &self.0.db { + InnerDbHandler::Secondary(inner_db_handler) => { + ffi_result!(rocksdb_get_pinned_cf_with_status( + inner_db_handler.db, + self.0.read_options, + column_family.0, + key.as_ptr() as *const c_char, + key.len(), + ))? + } + InnerDbHandler::Primary(inner_db_handler) + | InnerDbHandler::InMemory(inner_db_handler) => { + ffi_result!(rocksdb_transactiondb_get_pinned_cf_with_status( + inner_db_handler.db, + self.0.read_options, + column_family.0, + key.as_ptr() as *const c_char, + key.len() + ))? + } + }; Ok(if slice.is_null() { None } else { @@ -495,7 +654,7 @@ impl Db { ) -> Result<(), StorageError> { unsafe { ffi_result!(rocksdb_transactiondb_put_cf_with_status( - self.0.db, + primary_db_handler_or_error(&self.0.db)?.db, self.0.write_options, column_family.0, key.as_ptr() as *const c_char, @@ -510,7 +669,7 @@ impl Db { pub fn flush(&self, column_family: &ColumnFamily) -> Result<(), StorageError> { unsafe { ffi_result!(rocksdb_transactiondb_flush_cf_with_status( - self.0.db, + primary_db_handler_or_error(&self.0.db)?.db, self.0.flush_options, column_family.0, ))?; @@ -522,7 +681,7 @@ impl Db { pub fn compact(&self, column_family: &ColumnFamily) -> Result<(), StorageError> { unsafe { ffi_result!(rocksdb_transactiondb_compact_range_cf_opt_with_status( - self.0.db, + primary_db_handler_or_error(&self.0.db)?.db, column_family.0, self.0.compaction_options, ptr::null(), @@ -536,7 +695,7 @@ impl Db { pub fn new_sst_file(&self) -> Result { unsafe { - let path = self.0.path.join(random::().to_string()); + let path = self.0.db.primary_path().join(random::().to_string()); let writer = rocksdb_sstfilewriter_create(self.0.env_options, self.0.options); ffi_result!(rocksdb_sstfilewriter_open_with_status( writer, @@ -576,7 +735,7 @@ impl Db { .collect::>(); unsafe { ffi_result!(rocksdb_transactiondb_ingest_external_files_with_status( - self.0.db, + primary_db_handler_or_error(&self.0.db)?.db, args.as_ptr(), args.len() ))?; @@ -585,20 +744,25 @@ impl Db { } pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> { - if self.0.in_memory { - return Err(StorageError::Other( - "It is not possible to backup an in-memory database created with `Store::open`" + match &self.0.db { + InnerDbHandler::Secondary(_) => Err(StorageError::Other( + "It is not possible to backup an database opened as secondary.".into(), + )), + InnerDbHandler::InMemory(_) => Err(StorageError::Other( + "It is not possible to backup an in-memory database created with `Store::new`" .into(), - )); - } - let path = path_to_cstring(target_directory)?; - unsafe { - ffi_result!(rocksdb_transactiondb_create_checkpoint_with_status( - self.0.db, - path.as_ptr() - ))?; + )), + InnerDbHandler::Primary(db) => { + let path = path_to_cstring(target_directory)?; + unsafe { + ffi_result!(rocksdb_transactiondb_create_checkpoint_with_status( + db.db, + path.as_ptr() + ))?; + } + Ok(()) + } } - Ok(()) } } @@ -619,16 +783,24 @@ pub struct Reader { enum InnerReader { Snapshot(Rc), Transaction(Weak<*mut rocksdb_transaction_t>), + Secondary(Arc), // TODO: we should not hold the whole enum, because we have to use + // secondary_db_handler_or_error to access rocksdb_t. we should + // safe it directly?! } struct InnerSnapshot { - db: Arc, + db: Arc, // TODO: probably the same as InnerReader::Secondary snapshot: *const rocksdb_snapshot_t, } impl Drop for InnerSnapshot { fn drop(&mut self) { - unsafe { rocksdb_transactiondb_release_snapshot(self.db.db, self.snapshot) } + unsafe { + rocksdb_transactiondb_release_snapshot( + primary_db_handler_or_error(&self.db.db).unwrap().db, + self.snapshot, + ) + } } } @@ -657,7 +829,7 @@ impl Reader { let slice = match &self.inner { InnerReader::Snapshot(inner) => { ffi_result!(rocksdb_transactiondb_get_pinned_cf_with_status( - inner.db.db, + primary_db_handler_or_error(&inner.db.db)?.db, self.options, column_family.0, key.as_ptr() as *const c_char, @@ -679,6 +851,11 @@ impl Reader { )); } } + InnerReader::Secondary(_) => { + return Err(StorageError::Other( + "Database is opened as secondary, can not execute Reader.get".into(), + )) + } }?; Ok(if slice.is_null() { None @@ -737,9 +914,11 @@ impl Reader { ); } let iter = match &self.inner { - InnerReader::Snapshot(inner) => { - rocksdb_transactiondb_create_iterator_cf(inner.db.db, options, column_family.0) - } + InnerReader::Snapshot(inner) => rocksdb_transactiondb_create_iterator_cf( + primary_db_handler_or_error(&inner.db.db)?.db, + options, + column_family.0, + ), InnerReader::Transaction(inner) => { if let Some(inner) = inner.upgrade() { rocksdb_transaction_create_iterator_cf(*inner, options, column_family.0) @@ -749,6 +928,10 @@ impl Reader { )); } } + InnerReader::Secondary(inner) => { + let db = secondary_db_handler_or_error(&inner.db)?.db; + rocksdb_create_iterator_cf(db, options, column_family.0) + } }; assert!(!iter.is_null(), "rocksdb_create_iterator returned null"); if prefix.is_empty() { diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index b3bf01a9..0c86d3e5 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -14,6 +14,7 @@ pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, S #[cfg(not(target_family = "wasm"))] use crate::storage::numeric_encoder::Decoder; use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup}; +pub use crate::store::StoreOpenOptions; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; #[cfg(not(target_family = "wasm"))] use std::cmp::{max, min}; @@ -90,6 +91,11 @@ impl Storage { Self::setup(Db::open(path, Self::column_families())?) } + #[cfg(not(target_family = "wasm"))] + pub fn open_with_options(options: StoreOpenOptions) -> Result { + Self::setup(Db::open_with_options(options, Self::column_families())?) + } + fn column_families() -> Vec { vec![ ColumnFamilyDefinition { diff --git a/lib/src/store.rs b/lib/src/store.rs index 7e6974f4..9b8cfba5 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -43,8 +43,18 @@ use std::error::Error; use std::io::{BufRead, Write}; #[cfg(not(target_family = "wasm"))] use std::path::Path; +use std::path::PathBuf; use std::{fmt, str}; +pub struct SecondaryOptions { + pub path: PathBuf, + pub secondary_path: PathBuf, +} + +pub enum StoreOpenOptions { + OpenAsSecondary(SecondaryOptions), +} + /// An on-disk [RDF dataset](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset). /// Allows to query and update it using SPARQL. /// It is based on the [RocksDB](https://rocksdb.org/) key-value store. @@ -102,6 +112,14 @@ impl Store { }) } + /// Opens a [`Store`] with options + #[cfg(not(target_family = "wasm"))] + pub fn open_with_options(options: StoreOpenOptions) -> Result { + Ok(Self { + storage: Storage::open_with_options(options)?, + }) + } + /// Executes a [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/). /// /// Usage example: diff --git a/oxrocksdb-sys/api/c.cc b/oxrocksdb-sys/api/c.cc index 287e29fe..f9b660f0 100644 --- a/oxrocksdb-sys/api/c.cc +++ b/oxrocksdb-sys/api/c.cc @@ -18,6 +18,60 @@ static bool SaveStatus(rocksdb_status_t* target, const Status source) { extern "C" { +rocksdb_pinnableslice_t* rocksdb_get_pinned_cf_with_status( + rocksdb_t* db, const rocksdb_readoptions_t* options, + rocksdb_column_family_handle_t* column_family, const char* key, + size_t keylen, rocksdb_status_t* statusptr) { + rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t); + Status s = db->rep->Get(options->rep, column_family->rep, Slice(key, keylen), + &v->rep); + if (!s.ok()) { + delete v; + if (!s.IsNotFound()) { + SaveStatus(statusptr, s); + } + return nullptr; + } + return v; +} + + +void rocksdb_try_catch_up_with_primary_with_status( + rocksdb_t* db, rocksdb_status_t* statusptr) { + SaveStatus(statusptr, db->rep->TryCatchUpWithPrimary()); +} + +rocksdb_t* rocksdb_open_as_secondary_column_families_with_status( + const rocksdb_options_t* db_options, const char* name, + const char* secondary_path, int num_column_families, + const char* const* column_family_names, + const rocksdb_options_t* const* column_family_options, + rocksdb_column_family_handle_t** column_family_handles, + rocksdb_status_t* statusptr) { + std::vector column_families; + for (int i = 0; i != num_column_families; ++i) { + column_families.emplace_back( + std::string(column_family_names[i]), + ColumnFamilyOptions(column_family_options[i]->rep)); + } + DB* db; + std::vector handles; + if (SaveStatus(statusptr, DB::OpenAsSecondary(DBOptions(db_options->rep), + std::string(name), + std::string(secondary_path), + column_families, &handles, &db))) { + return nullptr; + } + for (size_t i = 0; i != handles.size(); ++i) { + rocksdb_column_family_handle_t* c_handle = + new rocksdb_column_family_handle_t; + c_handle->rep = handles[i]; + column_family_handles[i] = c_handle; + } + rocksdb_t* result = new rocksdb_t; + result->rep = db; + return result; +} rocksdb_transactiondb_t* rocksdb_transactiondb_open_column_families_with_status( const rocksdb_options_t* options, diff --git a/oxrocksdb-sys/api/c.h b/oxrocksdb-sys/api/c.h index 5b875a01..345d2200 100644 --- a/oxrocksdb-sys/api/c.h +++ b/oxrocksdb-sys/api/c.h @@ -65,6 +65,22 @@ typedef struct rocksdb_ingestexternalfilearg_t { rocksdb_ingestexternalfileoptions_t* options; } rocksdb_ingestexternalfilearg_t; +rocksdb_pinnableslice_t* rocksdb_get_pinned_cf_with_status( + rocksdb_t* db, const rocksdb_readoptions_t* options, + rocksdb_column_family_handle_t* column_family, const char* key, + size_t keylen, rocksdb_status_t* statusptr); + +extern ROCKSDB_LIBRARY_API void rocksdb_try_catch_up_with_primary_with_status( + rocksdb_t* db, rocksdb_status_t* statusptr); + +extern ROCKSDB_LIBRARY_API rocksdb_t* rocksdb_open_as_secondary_column_families_with_status( + const rocksdb_options_t* options, const char* name, + const char* secondary_path, int num_column_families, + const char* const* column_family_names, + const rocksdb_options_t* const* column_family_options, + rocksdb_column_family_handle_t** column_family_handles, + rocksdb_status_t* statusptr); + extern ROCKSDB_LIBRARY_API rocksdb_transactiondb_t* rocksdb_transactiondb_open_column_families_with_status( const rocksdb_options_t* options, const rocksdb_transactiondb_options_t* txn_db_options, const char* name, diff --git a/server/src/main.rs b/server/src/main.rs index 79e9e71f..5fae8c3d 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -6,7 +6,7 @@ use oxhttp::Server; use oxigraph::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer}; use oxigraph::model::{GraphName, GraphNameRef, IriParseError, NamedNode, NamedOrBlankNode}; use oxigraph::sparql::{Query, QueryResults, Update}; -use oxigraph::store::{BulkLoader, LoaderError, Store}; +use oxigraph::store::{BulkLoader, LoaderError, SecondaryOptions, Store, StoreOpenOptions}; use oxiri::Iri; use rand::random; use rayon_core::ThreadPoolBuilder; @@ -37,6 +37,10 @@ struct Args { /// Directory in which persist the data. #[arg(short, long, global = true)] location: Option, + /// Open underlying database in readonly mode, specify path to store info logs (LOG) + // see https://github.com/facebook/rocksdb/wiki/Read-only-and-Secondary-instances + #[arg(short, long, global = true)] + secondary_location: Option, #[command(subcommand)] command: Command, } @@ -67,7 +71,14 @@ enum Command { pub fn main() -> anyhow::Result<()> { let matches = Args::parse(); let store = if let Some(path) = &matches.location { - Store::open(path) + if let Some(secondary_path) = &matches.secondary_location { + Store::open_with_options(StoreOpenOptions::OpenAsSecondary(SecondaryOptions { + path: path.to_path_buf(), + secondary_path: secondary_path.to_path_buf(), + })) + } else { + Store::open(path) + } } else { Store::new() }?;