From aa9476b9ccc960dc0d4c2a30227827738b70aea8 Mon Sep 17 00:00:00 2001 From: Benedikt Seidl Date: Fri, 13 Jan 2023 16:23:23 +0100 Subject: [PATCH] Add option to open rocksdb in secondary mode The database can be opened once in primary mode, but may be opened multiple times in secondary mode. It's not possible to write data to the database in secondary mode. Secondary mode does not support checkpoints so there might be data inconsistencies when data is changed while said data is queried. It might happen that the result data mixes both elements before and after the change that can not happen in primary mode. --- lib/src/storage/backend/rocksdb.rs | 397 +++++++++++++++++++++-------- lib/src/storage/mod.rs | 6 + lib/src/store.rs | 18 ++ oxrocksdb-sys/api/c.cc | 54 ++++ oxrocksdb-sys/api/c.h | 16 ++ server/src/main.rs | 15 +- 6 files changed, 397 insertions(+), 109 deletions(-) diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index 706c5f85..d74e59bd 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -3,7 +3,7 @@ #![allow(unsafe_code, trivial_casts)] use crate::storage::error::StorageError; -use crate::store::CorruptionError; +use crate::store::{CorruptionError, StoreOpenOptions}; use lazy_static::lazy_static; use libc::{self, c_char, c_void, free}; use oxrocksdb_sys::*; @@ -25,6 +25,13 @@ use std::sync::Arc; use std::thread::{available_parallelism, yield_now}; use std::{ptr, slice}; +#[derive(PartialEq, Eq)] +enum OpeningMode { + InMemory, + Primary, + Secondary(PathBuf), +} + macro_rules! ffi_result { ( $($function:ident)::*() ) => { ffi_result_impl!($($function)::*()) @@ -69,6 +76,28 @@ lazy_static! { }; } +fn primary_db_handler_or_error( + db: &InnerDbHandler, +) -> Result<&InnerDbHandlerPrimary, StorageError> { + match db { + InnerDbHandler::Primary(inner_db) | InnerDbHandler::InMemory(inner_db) => Ok(inner_db), + _ => Err(StorageError::Other( + "Database is opened as secondary, can not execute this operation.".into(), + )), + } +} + +fn secondary_db_handler_or_error( + db: &InnerDbHandler, +) -> Result<&InnerDbHandlerSecondary, StorageError> { + match db { + InnerDbHandler::Secondary(inner_db) => Ok(inner_db), + _ => Err(StorageError::Other(Box::::from( + "Expected secondary InnerDbHandler, got primary/in memory one", + ))), + } +} + pub struct ColumnFamilyDefinition { pub name: &'static str, pub use_iter: bool, @@ -84,11 +113,44 @@ unsafe impl Send for Db {} unsafe impl Sync for Db {} -struct DbHandler { +/// Handler that has read only access to the database +struct InnerDbHandlerSecondary { + db: *mut rocksdb_t, + primary_path: PathBuf, +} + +/// Handler that has read and write access to the database +struct InnerDbHandlerPrimary { db: *mut rocksdb_transactiondb_t, - options: *mut rocksdb_options_t, transaction_options: *mut rocksdb_transaction_options_t, transactiondb_options: *mut rocksdb_transactiondb_options_t, + path: PathBuf, +} + +enum InnerDbHandler { + Primary(InnerDbHandlerPrimary), + InMemory(InnerDbHandlerPrimary), + Secondary(InnerDbHandlerSecondary), +} + +impl InnerDbHandler { + fn is_null(&self) -> bool { + match self { + Self::Primary(s) | Self::InMemory(s) => s.db.is_null(), + Self::Secondary(s) => s.db.is_null(), + } + } + fn primary_path(&self) -> &PathBuf { + match self { + Self::Primary(s) | Self::InMemory(s) => &s.path, + Self::Secondary(s) => &s.primary_path, + } + } +} + +struct DbHandler { + db: InnerDbHandler, + options: *mut rocksdb_options_t, read_options: *mut rocksdb_readoptions_t, write_options: *mut rocksdb_writeoptions_t, flush_options: *mut rocksdb_flushoptions_t, @@ -99,8 +161,6 @@ struct DbHandler { column_family_names: Vec<&'static str>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_options: Vec<*mut rocksdb_options_t>, - path: PathBuf, - in_memory: bool, } impl Drop for DbHandler { @@ -109,7 +169,14 @@ impl Drop for DbHandler { for cf_handle in &self.cf_handles { rocksdb_column_family_handle_destroy(*cf_handle); } - rocksdb_transactiondb_close(self.db); + match &self.db { + InnerDbHandler::Primary(s) | InnerDbHandler::InMemory(s) => { + rocksdb_transactiondb_close(s.db); + } + InnerDbHandler::Secondary(s) => { + rocksdb_close(s.db); + } + } for cf_option in &self.cf_options { rocksdb_options_destroy(*cf_option); } @@ -119,13 +186,19 @@ impl Drop for DbHandler { rocksdb_envoptions_destroy(self.env_options); rocksdb_ingestexternalfileoptions_destroy(self.ingest_external_file_options); rocksdb_compactoptions_destroy(self.compaction_options); - rocksdb_transaction_options_destroy(self.transaction_options); - rocksdb_transactiondb_options_destroy(self.transactiondb_options); + if let InnerDbHandler::Primary(inner_db) | InnerDbHandler::InMemory(inner_db) = &self.db + { + rocksdb_transaction_options_destroy(inner_db.transaction_options); + rocksdb_transactiondb_options_destroy(inner_db.transactiondb_options); + } rocksdb_options_destroy(self.options); rocksdb_block_based_options_destroy(self.block_based_table_options); } - if self.in_memory && self.path.exists() { - remove_dir_all(&self.path).unwrap(); + + if let InnerDbHandler::InMemory(db) = &self.db { + if db.path.exists() { + remove_dir_all(&db.path).unwrap(); + } } } } @@ -138,7 +211,11 @@ impl Db { temp_dir() } .join(format!("oxigraph-rocksdb-{}", random::())); - Ok(Self(Arc::new(Self::do_open(path, column_families, true)?))) + Ok(Self(Arc::new(Self::do_open( + path, + column_families, + &OpeningMode::InMemory, + )?))) } pub fn open( @@ -148,17 +225,29 @@ impl Db { Ok(Self(Arc::new(Self::do_open( path.to_owned(), column_families, - false, + &OpeningMode::Primary, )?))) } + pub fn open_with_options( + options: StoreOpenOptions, + column_families: Vec, + ) -> Result { + match options { + StoreOpenOptions::OpenAsSecondary(options) => Ok(Self(Arc::new(Self::do_open( + options.path, + column_families, + &OpeningMode::Secondary(options.secondary_path), + )?))), + } + } + fn do_open( path: PathBuf, mut column_families: Vec, - in_memory: bool, + open_mode: &OpeningMode, ) -> Result { let c_path = path_to_cstring(&path)?; - unsafe { let options = rocksdb_options_create(); assert!(!options.is_null(), "rocksdb_options_create returned null"); @@ -170,28 +259,32 @@ impl Db { available_parallelism()?.get().try_into().unwrap(), ); if let Some(available_fd) = available_file_descriptors()? { - if available_fd < 96 { - rocksdb_options_destroy(options); - return Err(io::Error::new( - io::ErrorKind::Other, - format!( - "Oxigraph needs at least 96 file descriptors, only {available_fd} allowed. Run e.g. `ulimit -n 512` to allow 512 opened files" - - ), - ) - .into()); - } - rocksdb_options_set_max_open_files( - options, - (available_fd - 48).try_into().unwrap(), - ); + let max_open_files = match &open_mode { + OpeningMode::Primary | OpeningMode::InMemory => { + if available_fd < 96 { + rocksdb_options_destroy(options); + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Oxigraph needs at least 96 file descriptors, \ + only {available_fd} allowed. \ + Run e.g. `ulimit -n 512` to allow 512 opened files" + ), + ) + .into()); + } + (available_fd - 48).try_into().unwrap() + } + OpeningMode::Secondary(_) => -1, + }; + rocksdb_options_set_max_open_files(options, max_open_files); } rocksdb_options_set_info_log_level(options, 2); // We only log warnings rocksdb_options_set_max_log_file_size(options, 1024 * 1024); // Only 1MB log size rocksdb_options_set_recycle_log_file_num(options, 10); // We do not keep more than 10 log files rocksdb_options_set_compression( options, - if in_memory { + if open_mode == &OpeningMode::InMemory { rocksdb_no_compression } else { rocksdb_lz4_compression @@ -201,7 +294,7 @@ impl Db { ); rocksdb_options_set_env( options, - if in_memory { + if open_mode == &OpeningMode::InMemory { ROCKSDB_MEM_ENV.0 } else { ROCKSDB_ENV.0 @@ -219,12 +312,6 @@ impl Db { ); rocksdb_options_set_block_based_table_factory(options, block_based_table_options); - let transactiondb_options = rocksdb_transactiondb_options_create(); - assert!( - !transactiondb_options.is_null(), - "rocksdb_transactiondb_options_create returned null" - ); - if !column_families.iter().any(|c| c.name == "default") { column_families.push(ColumnFamilyDefinition { name: "default", @@ -261,24 +348,76 @@ impl Db { let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = vec![ptr::null_mut(); column_family_names.len()]; - let db = ffi_result!(rocksdb_transactiondb_open_column_families_with_status( - options, - transactiondb_options, - c_path.as_ptr(), - c_column_families.len().try_into().unwrap(), - c_column_families - .iter() - .map(|cf| cf.as_ptr()) - .collect::>() - .as_ptr(), - cf_options.as_ptr() as *const *const rocksdb_options_t, - cf_handles.as_mut_ptr(), - )) + + let c_num_column_families = c_column_families.len().try_into().unwrap(); + let c_column_family_names = c_column_families + .iter() + .map(|cf| cf.as_ptr()) + .collect::>(); + + let db = match &open_mode { + OpeningMode::Primary | OpeningMode::InMemory => { + let transactiondb_options = rocksdb_transactiondb_options_create(); + assert!( + !transactiondb_options.is_null(), + "rocksdb_transactiondb_options_create returned null" + ); + + ffi_result!(rocksdb_transactiondb_open_column_families_with_status( + options, + transactiondb_options, + c_path.as_ptr(), + c_num_column_families, + c_column_family_names.as_ptr(), + cf_options.as_ptr() as *const *const rocksdb_options_t, + cf_handles.as_mut_ptr(), + )) + .map(|db| { + let transaction_options = rocksdb_transaction_options_create(); + assert!( + !transaction_options.is_null(), + "rocksdb_transaction_options_create returned null" + ); + rocksdb_transaction_options_set_set_snapshot(transaction_options, 1); + let primary_db_handler = InnerDbHandlerPrimary { + db, + transaction_options, + transactiondb_options, + path, + }; + match open_mode { + OpeningMode::InMemory => InnerDbHandler::InMemory(primary_db_handler), + OpeningMode::Primary => InnerDbHandler::Primary(primary_db_handler), + _ => unreachable!(), + } + }) + .map_err(|e| { + rocksdb_transactiondb_options_destroy(transactiondb_options); + e + }) + } + OpeningMode::Secondary(secondary_path) => { + ffi_result!(rocksdb_open_as_secondary_column_families_with_status( + options, + c_path.as_ptr(), + path_to_cstring(secondary_path)?.as_ptr(), + c_num_column_families, + c_column_family_names.as_ptr(), + cf_options.as_ptr() as *const *const rocksdb_options_t, + cf_handles.as_mut_ptr(), + )) + .map(|db| { + InnerDbHandler::Secondary(InnerDbHandlerSecondary { + db, + primary_path: path, + }) + }) + } + } .map_err(|e| { for cf_option in &cf_options { rocksdb_options_destroy(*cf_option); } - rocksdb_transactiondb_options_destroy(transactiondb_options); rocksdb_options_destroy(options); rocksdb_block_based_options_destroy(block_based_table_options); e @@ -302,7 +441,7 @@ impl Db { !write_options.is_null(), "rocksdb_writeoptions_create returned null" ); - if in_memory { + if open_mode == &OpeningMode::InMemory { rocksdb_writeoptions_disable_WAL(write_options, 1); // No need for WAL } @@ -330,18 +469,9 @@ impl Db { "rocksdb_compactoptions_create returned null" ); - let transaction_options = rocksdb_transaction_options_create(); - assert!( - !transaction_options.is_null(), - "rocksdb_transaction_options_create returned null" - ); - rocksdb_transaction_options_set_set_snapshot(transaction_options, 1); - Ok(DbHandler { db, options, - transaction_options, - transactiondb_options, read_options, write_options, flush_options, @@ -352,8 +482,6 @@ impl Db { column_family_names, cf_handles, cf_options, - path, - in_memory, }) } } @@ -370,19 +498,35 @@ impl Db { #[must_use] pub fn snapshot(&self) -> Reader { unsafe { - let snapshot = rocksdb_transactiondb_create_snapshot(self.0.db); - assert!( - !snapshot.is_null(), - "rocksdb_transactiondb_create_snapshot returned null" - ); let options = rocksdb_readoptions_create_copy(self.0.read_options); - rocksdb_readoptions_set_snapshot(options, snapshot); - Reader { - inner: InnerReader::Snapshot(Rc::new(InnerSnapshot { - db: self.0.clone(), - snapshot, - })), - options, + match &self.0.db { + InnerDbHandler::InMemory(inner_db_handler) + | InnerDbHandler::Primary(inner_db_handler) => { + let snapshot = rocksdb_transactiondb_create_snapshot(inner_db_handler.db); + assert!( + !snapshot.is_null(), + "rocksdb_transactiondb_create_snapshot returned null" + ); + rocksdb_readoptions_set_snapshot(options, snapshot); + Reader { + inner: InnerReader::Snapshot(Rc::new(InnerSnapshot { + db: self.0.clone(), + snapshot, + })), + options, + } + } + InnerDbHandler::Secondary(_) => { + ffi_result!(rocksdb_try_catch_up_with_primary_with_status( + secondary_db_handler_or_error(&self.0.db).unwrap().db, + )) + .unwrap(); + + Reader { + inner: InnerReader::Secondary(self.0.clone()), + options, + } + } } } } @@ -393,10 +537,11 @@ impl Db { ) -> Result { loop { let transaction = unsafe { + let db = primary_db_handler_or_error(&self.0.db)?; let transaction = rocksdb_transaction_begin( - self.0.db, + db.db, self.0.write_options, - self.0.transaction_options, + db.transaction_options, ptr::null_mut(), ); assert!( @@ -464,13 +609,27 @@ impl Db { key: &[u8], ) -> Result, StorageError> { unsafe { - let slice = ffi_result!(rocksdb_transactiondb_get_pinned_cf_with_status( - self.0.db, - self.0.read_options, - column_family.0, - key.as_ptr() as *const c_char, - key.len() - ))?; + let slice = match &self.0.db { + InnerDbHandler::Secondary(inner_db_handler) => { + ffi_result!(rocksdb_get_pinned_cf_with_status( + inner_db_handler.db, + self.0.read_options, + column_family.0, + key.as_ptr() as *const c_char, + key.len(), + ))? + } + InnerDbHandler::Primary(inner_db_handler) + | InnerDbHandler::InMemory(inner_db_handler) => { + ffi_result!(rocksdb_transactiondb_get_pinned_cf_with_status( + inner_db_handler.db, + self.0.read_options, + column_family.0, + key.as_ptr() as *const c_char, + key.len() + ))? + } + }; Ok(if slice.is_null() { None } else { @@ -495,7 +654,7 @@ impl Db { ) -> Result<(), StorageError> { unsafe { ffi_result!(rocksdb_transactiondb_put_cf_with_status( - self.0.db, + primary_db_handler_or_error(&self.0.db)?.db, self.0.write_options, column_family.0, key.as_ptr() as *const c_char, @@ -510,7 +669,7 @@ impl Db { pub fn flush(&self, column_family: &ColumnFamily) -> Result<(), StorageError> { unsafe { ffi_result!(rocksdb_transactiondb_flush_cf_with_status( - self.0.db, + primary_db_handler_or_error(&self.0.db)?.db, self.0.flush_options, column_family.0, ))?; @@ -522,7 +681,7 @@ impl Db { pub fn compact(&self, column_family: &ColumnFamily) -> Result<(), StorageError> { unsafe { ffi_result!(rocksdb_transactiondb_compact_range_cf_opt_with_status( - self.0.db, + primary_db_handler_or_error(&self.0.db)?.db, column_family.0, self.0.compaction_options, ptr::null(), @@ -536,7 +695,7 @@ impl Db { pub fn new_sst_file(&self) -> Result { unsafe { - let path = self.0.path.join(random::().to_string()); + let path = self.0.db.primary_path().join(random::().to_string()); let writer = rocksdb_sstfilewriter_create(self.0.env_options, self.0.options); ffi_result!(rocksdb_sstfilewriter_open_with_status( writer, @@ -576,7 +735,7 @@ impl Db { .collect::>(); unsafe { ffi_result!(rocksdb_transactiondb_ingest_external_files_with_status( - self.0.db, + primary_db_handler_or_error(&self.0.db)?.db, args.as_ptr(), args.len() ))?; @@ -585,20 +744,25 @@ impl Db { } pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> { - if self.0.in_memory { - return Err(StorageError::Other( - "It is not possible to backup an in-memory database created with `Store::open`" + match &self.0.db { + InnerDbHandler::Secondary(_) => Err(StorageError::Other( + "It is not possible to backup an database opened as secondary.".into(), + )), + InnerDbHandler::InMemory(_) => Err(StorageError::Other( + "It is not possible to backup an in-memory database created with `Store::new`" .into(), - )); - } - let path = path_to_cstring(target_directory)?; - unsafe { - ffi_result!(rocksdb_transactiondb_create_checkpoint_with_status( - self.0.db, - path.as_ptr() - ))?; + )), + InnerDbHandler::Primary(db) => { + let path = path_to_cstring(target_directory)?; + unsafe { + ffi_result!(rocksdb_transactiondb_create_checkpoint_with_status( + db.db, + path.as_ptr() + ))?; + } + Ok(()) + } } - Ok(()) } } @@ -619,16 +783,24 @@ pub struct Reader { enum InnerReader { Snapshot(Rc), Transaction(Weak<*mut rocksdb_transaction_t>), + Secondary(Arc), // TODO: we should not hold the whole enum, because we have to use + // secondary_db_handler_or_error to access rocksdb_t. we should + // safe it directly?! } struct InnerSnapshot { - db: Arc, + db: Arc, // TODO: probably the same as InnerReader::Secondary snapshot: *const rocksdb_snapshot_t, } impl Drop for InnerSnapshot { fn drop(&mut self) { - unsafe { rocksdb_transactiondb_release_snapshot(self.db.db, self.snapshot) } + unsafe { + rocksdb_transactiondb_release_snapshot( + primary_db_handler_or_error(&self.db.db).unwrap().db, + self.snapshot, + ) + } } } @@ -657,7 +829,7 @@ impl Reader { let slice = match &self.inner { InnerReader::Snapshot(inner) => { ffi_result!(rocksdb_transactiondb_get_pinned_cf_with_status( - inner.db.db, + primary_db_handler_or_error(&inner.db.db)?.db, self.options, column_family.0, key.as_ptr() as *const c_char, @@ -679,6 +851,11 @@ impl Reader { )); } } + InnerReader::Secondary(_) => { + return Err(StorageError::Other( + "Database is opened as secondary, can not execute Reader.get".into(), + )) + } }?; Ok(if slice.is_null() { None @@ -737,9 +914,11 @@ impl Reader { ); } let iter = match &self.inner { - InnerReader::Snapshot(inner) => { - rocksdb_transactiondb_create_iterator_cf(inner.db.db, options, column_family.0) - } + InnerReader::Snapshot(inner) => rocksdb_transactiondb_create_iterator_cf( + primary_db_handler_or_error(&inner.db.db)?.db, + options, + column_family.0, + ), InnerReader::Transaction(inner) => { if let Some(inner) = inner.upgrade() { rocksdb_transaction_create_iterator_cf(*inner, options, column_family.0) @@ -749,6 +928,10 @@ impl Reader { )); } } + InnerReader::Secondary(inner) => { + let db = secondary_db_handler_or_error(&inner.db)?.db; + rocksdb_create_iterator_cf(db, options, column_family.0) + } }; assert!(!iter.is_null(), "rocksdb_create_iterator returned null"); if prefix.is_empty() { diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index b3bf01a9..0c86d3e5 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -14,6 +14,7 @@ pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, S #[cfg(not(target_family = "wasm"))] use crate::storage::numeric_encoder::Decoder; use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup}; +pub use crate::store::StoreOpenOptions; use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; #[cfg(not(target_family = "wasm"))] use std::cmp::{max, min}; @@ -90,6 +91,11 @@ impl Storage { Self::setup(Db::open(path, Self::column_families())?) } + #[cfg(not(target_family = "wasm"))] + pub fn open_with_options(options: StoreOpenOptions) -> Result { + Self::setup(Db::open_with_options(options, Self::column_families())?) + } + fn column_families() -> Vec { vec![ ColumnFamilyDefinition { diff --git a/lib/src/store.rs b/lib/src/store.rs index 7e6974f4..9b8cfba5 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -43,8 +43,18 @@ use std::error::Error; use std::io::{BufRead, Write}; #[cfg(not(target_family = "wasm"))] use std::path::Path; +use std::path::PathBuf; use std::{fmt, str}; +pub struct SecondaryOptions { + pub path: PathBuf, + pub secondary_path: PathBuf, +} + +pub enum StoreOpenOptions { + OpenAsSecondary(SecondaryOptions), +} + /// An on-disk [RDF dataset](https://www.w3.org/TR/rdf11-concepts/#dfn-rdf-dataset). /// Allows to query and update it using SPARQL. /// It is based on the [RocksDB](https://rocksdb.org/) key-value store. @@ -102,6 +112,14 @@ impl Store { }) } + /// Opens a [`Store`] with options + #[cfg(not(target_family = "wasm"))] + pub fn open_with_options(options: StoreOpenOptions) -> Result { + Ok(Self { + storage: Storage::open_with_options(options)?, + }) + } + /// Executes a [SPARQL 1.1 query](https://www.w3.org/TR/sparql11-query/). /// /// Usage example: diff --git a/oxrocksdb-sys/api/c.cc b/oxrocksdb-sys/api/c.cc index 287e29fe..f9b660f0 100644 --- a/oxrocksdb-sys/api/c.cc +++ b/oxrocksdb-sys/api/c.cc @@ -18,6 +18,60 @@ static bool SaveStatus(rocksdb_status_t* target, const Status source) { extern "C" { +rocksdb_pinnableslice_t* rocksdb_get_pinned_cf_with_status( + rocksdb_t* db, const rocksdb_readoptions_t* options, + rocksdb_column_family_handle_t* column_family, const char* key, + size_t keylen, rocksdb_status_t* statusptr) { + rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t); + Status s = db->rep->Get(options->rep, column_family->rep, Slice(key, keylen), + &v->rep); + if (!s.ok()) { + delete v; + if (!s.IsNotFound()) { + SaveStatus(statusptr, s); + } + return nullptr; + } + return v; +} + + +void rocksdb_try_catch_up_with_primary_with_status( + rocksdb_t* db, rocksdb_status_t* statusptr) { + SaveStatus(statusptr, db->rep->TryCatchUpWithPrimary()); +} + +rocksdb_t* rocksdb_open_as_secondary_column_families_with_status( + const rocksdb_options_t* db_options, const char* name, + const char* secondary_path, int num_column_families, + const char* const* column_family_names, + const rocksdb_options_t* const* column_family_options, + rocksdb_column_family_handle_t** column_family_handles, + rocksdb_status_t* statusptr) { + std::vector column_families; + for (int i = 0; i != num_column_families; ++i) { + column_families.emplace_back( + std::string(column_family_names[i]), + ColumnFamilyOptions(column_family_options[i]->rep)); + } + DB* db; + std::vector handles; + if (SaveStatus(statusptr, DB::OpenAsSecondary(DBOptions(db_options->rep), + std::string(name), + std::string(secondary_path), + column_families, &handles, &db))) { + return nullptr; + } + for (size_t i = 0; i != handles.size(); ++i) { + rocksdb_column_family_handle_t* c_handle = + new rocksdb_column_family_handle_t; + c_handle->rep = handles[i]; + column_family_handles[i] = c_handle; + } + rocksdb_t* result = new rocksdb_t; + result->rep = db; + return result; +} rocksdb_transactiondb_t* rocksdb_transactiondb_open_column_families_with_status( const rocksdb_options_t* options, diff --git a/oxrocksdb-sys/api/c.h b/oxrocksdb-sys/api/c.h index 5b875a01..345d2200 100644 --- a/oxrocksdb-sys/api/c.h +++ b/oxrocksdb-sys/api/c.h @@ -65,6 +65,22 @@ typedef struct rocksdb_ingestexternalfilearg_t { rocksdb_ingestexternalfileoptions_t* options; } rocksdb_ingestexternalfilearg_t; +rocksdb_pinnableslice_t* rocksdb_get_pinned_cf_with_status( + rocksdb_t* db, const rocksdb_readoptions_t* options, + rocksdb_column_family_handle_t* column_family, const char* key, + size_t keylen, rocksdb_status_t* statusptr); + +extern ROCKSDB_LIBRARY_API void rocksdb_try_catch_up_with_primary_with_status( + rocksdb_t* db, rocksdb_status_t* statusptr); + +extern ROCKSDB_LIBRARY_API rocksdb_t* rocksdb_open_as_secondary_column_families_with_status( + const rocksdb_options_t* options, const char* name, + const char* secondary_path, int num_column_families, + const char* const* column_family_names, + const rocksdb_options_t* const* column_family_options, + rocksdb_column_family_handle_t** column_family_handles, + rocksdb_status_t* statusptr); + extern ROCKSDB_LIBRARY_API rocksdb_transactiondb_t* rocksdb_transactiondb_open_column_families_with_status( const rocksdb_options_t* options, const rocksdb_transactiondb_options_t* txn_db_options, const char* name, diff --git a/server/src/main.rs b/server/src/main.rs index 79e9e71f..5fae8c3d 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -6,7 +6,7 @@ use oxhttp::Server; use oxigraph::io::{DatasetFormat, DatasetSerializer, GraphFormat, GraphSerializer}; use oxigraph::model::{GraphName, GraphNameRef, IriParseError, NamedNode, NamedOrBlankNode}; use oxigraph::sparql::{Query, QueryResults, Update}; -use oxigraph::store::{BulkLoader, LoaderError, Store}; +use oxigraph::store::{BulkLoader, LoaderError, SecondaryOptions, Store, StoreOpenOptions}; use oxiri::Iri; use rand::random; use rayon_core::ThreadPoolBuilder; @@ -37,6 +37,10 @@ struct Args { /// Directory in which persist the data. #[arg(short, long, global = true)] location: Option, + /// Open underlying database in readonly mode, specify path to store info logs (LOG) + // see https://github.com/facebook/rocksdb/wiki/Read-only-and-Secondary-instances + #[arg(short, long, global = true)] + secondary_location: Option, #[command(subcommand)] command: Command, } @@ -67,7 +71,14 @@ enum Command { pub fn main() -> anyhow::Result<()> { let matches = Args::parse(); let store = if let Some(path) = &matches.location { - Store::open(path) + if let Some(secondary_path) = &matches.secondary_location { + Store::open_with_options(StoreOpenOptions::OpenAsSecondary(SecondaryOptions { + path: path.to_path_buf(), + secondary_path: secondary_path.to_path_buf(), + })) + } else { + Store::open(path) + } } else { Store::new() }?;