diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index 7fb6d0ca..21c3fa89 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -24,14 +24,6 @@ use std::sync::Arc; use std::thread::{available_parallelism, yield_now}; use std::{ptr, slice}; -#[derive(PartialEq, Eq)] -enum OpeningMode { - InMemory, - Primary, - ReadOnly, - Secondary(PathBuf), -} - macro_rules! ffi_result { ( $($function:ident)::*() ) => { ffi_result_impl!($($function)::*()) @@ -76,28 +68,6 @@ lazy_static! { }; } -fn primary_db_handler_or_error( - db: &InnerDbHandler, -) -> Result<&InnerDbHandlerPrimary, StorageError> { - match db { - InnerDbHandler::Primary(inner_db) | InnerDbHandler::InMemory(inner_db) => Ok(inner_db), - _ => Err(StorageError::Other( - "Database is opened as secondary or readonly, can not execute this operation.".into(), - )), - } -} - -fn secondary_db_handler_or_error( - db: &InnerDbHandler, -) -> Result<&InnerDbHandlerSecondary, StorageError> { - match db { - InnerDbHandler::ReadOnly(inner_db) | InnerDbHandler::Secondary(inner_db) => Ok(inner_db), - _ => Err(StorageError::Other(Box::::from( - "Expected secondary/read-only InnerDbHandler, got primary/in memory one", - ))), - } -} - pub struct ColumnFamilyDefinition { pub name: &'static str, pub use_iter: bool, @@ -106,52 +76,21 @@ pub struct ColumnFamilyDefinition { } #[derive(Clone)] -pub struct Db(Arc); - -#[allow(clippy::non_send_fields_in_send_ty)] -unsafe impl Send for Db {} - -unsafe impl Sync for Db {} +pub struct Db { + inner: DbKind, +} -/// Handler that has read only access to the database -struct InnerDbHandlerSecondary { - db: *mut rocksdb_t, - primary_path: PathBuf, +#[derive(Clone)] +enum DbKind { + ReadOnly(Arc), + ReadWrite(Arc), } -/// Handler that has read and write access to the database -struct InnerDbHandlerPrimary { +struct RwDbHandler { db: *mut rocksdb_transactiondb_t, + options: *mut rocksdb_options_t, transaction_options: *mut rocksdb_transaction_options_t, transactiondb_options: *mut rocksdb_transactiondb_options_t, - path: PathBuf, -} - -enum InnerDbHandler { - Primary(InnerDbHandlerPrimary), - InMemory(InnerDbHandlerPrimary), - Secondary(InnerDbHandlerSecondary), - ReadOnly(InnerDbHandlerSecondary), -} - -impl InnerDbHandler { - fn is_null(&self) -> bool { - match self { - Self::Primary(s) | Self::InMemory(s) => s.db.is_null(), - Self::ReadOnly(s) | Self::Secondary(s) => s.db.is_null(), - } - } - fn primary_path(&self) -> &PathBuf { - match self { - Self::Primary(s) | Self::InMemory(s) => &s.path, - Self::ReadOnly(s) | Self::Secondary(s) => &s.primary_path, - } - } -} - -struct DbHandler { - db: InnerDbHandler, - options: *mut rocksdb_options_t, read_options: *mut rocksdb_readoptions_t, write_options: *mut rocksdb_writeoptions_t, flush_options: *mut rocksdb_flushoptions_t, @@ -162,22 +101,21 @@ struct DbHandler { column_family_names: Vec<&'static str>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_options: Vec<*mut rocksdb_options_t>, + path: PathBuf, + in_memory: bool, } -impl Drop for DbHandler { +unsafe impl Send for RwDbHandler {} + +unsafe impl Sync for RwDbHandler {} + +impl Drop for RwDbHandler { fn drop(&mut self) { unsafe { for cf_handle in &self.cf_handles { rocksdb_column_family_handle_destroy(*cf_handle); } - match &self.db { - InnerDbHandler::Primary(s) | InnerDbHandler::InMemory(s) => { - rocksdb_transactiondb_close(s.db); - } - InnerDbHandler::Secondary(s) | InnerDbHandler::ReadOnly(s) => { - rocksdb_close(s.db); - } - } + rocksdb_transactiondb_close(self.db); for cf_option in &self.cf_options { rocksdb_options_destroy(*cf_option); } @@ -187,19 +125,43 @@ impl Drop for DbHandler { rocksdb_envoptions_destroy(self.env_options); rocksdb_ingestexternalfileoptions_destroy(self.ingest_external_file_options); rocksdb_compactoptions_destroy(self.compaction_options); - if let InnerDbHandler::Primary(inner_db) | InnerDbHandler::InMemory(inner_db) = &self.db - { - rocksdb_transaction_options_destroy(inner_db.transaction_options); - rocksdb_transactiondb_options_destroy(inner_db.transactiondb_options); - } + rocksdb_transaction_options_destroy(self.transaction_options); + rocksdb_transactiondb_options_destroy(self.transactiondb_options); rocksdb_options_destroy(self.options); rocksdb_block_based_options_destroy(self.block_based_table_options); } + if self.in_memory && self.path.exists() { + remove_dir_all(&self.path).unwrap(); + } + } +} + +struct RoDbHandler { + db: *mut rocksdb_t, + options: *mut rocksdb_options_t, + read_options: *mut rocksdb_readoptions_t, + column_family_names: Vec<&'static str>, + cf_handles: Vec<*mut rocksdb_column_family_handle_t>, + cf_options: Vec<*mut rocksdb_options_t>, + is_secondary: bool, +} - if let InnerDbHandler::InMemory(db) = &self.db { - if db.path.exists() { - remove_dir_all(&db.path).unwrap(); +unsafe impl Send for RoDbHandler {} + +unsafe impl Sync for RoDbHandler {} + +impl Drop for RoDbHandler { + fn drop(&mut self) { + unsafe { + for cf_handle in &self.cf_handles { + rocksdb_column_family_handle_destroy(*cf_handle); } + rocksdb_close(self.db); + for cf_option in &self.cf_options { + rocksdb_options_destroy(*cf_option); + } + rocksdb_readoptions_destroy(self.read_options); + rocksdb_options_destroy(self.options); } } } @@ -212,90 +174,29 @@ impl Db { temp_dir() } .join(format!("oxigraph-rocksdb-{}", random::())); - Ok(Self(Arc::new(Self::do_open( - path, - column_families, - &OpeningMode::InMemory, - )?))) + Self::open_read_write(path, column_families, true) } pub fn open( path: &Path, column_families: Vec, ) -> Result { - Ok(Self(Arc::new(Self::do_open( - path.to_path_buf(), - column_families, - &OpeningMode::Primary, - )?))) + Self::open_read_write(path.into(), column_families, false) } - pub fn open_secondary( - primary_path: &Path, - secondary_path: &Path, - column_families: Vec, - ) -> Result { - Ok(Self(Arc::new(Self::do_open( - primary_path.to_path_buf(), - column_families, - &OpeningMode::Secondary(secondary_path.to_path_buf()), - )?))) - } - - pub fn open_read_only( - path: &Path, + fn open_read_write( + path: PathBuf, column_families: Vec, + in_memory: bool, ) -> Result { - Ok(Self(Arc::new(Self::do_open( - path.to_path_buf(), - column_families, - &OpeningMode::ReadOnly, - )?))) - } - - fn do_open( - path: PathBuf, - mut column_families: Vec, - open_mode: &OpeningMode, - ) -> Result { - let c_path = path_to_cstring(&path)?; unsafe { - let options = rocksdb_options_create(); - assert!(!options.is_null(), "rocksdb_options_create returned null"); + let c_path = path_to_cstring(&path)?; + let options = Self::db_options(true, in_memory)?; rocksdb_options_set_create_if_missing(options, 1); rocksdb_options_set_create_missing_column_families(options, 1); - rocksdb_options_optimize_level_style_compaction(options, 512 * 1024 * 1024); - rocksdb_options_increase_parallelism( - options, - available_parallelism()?.get().try_into().unwrap(), - ); - if let Some(available_fd) = available_file_descriptors()? { - let max_open_files = match &open_mode { - OpeningMode::Primary | OpeningMode::InMemory | OpeningMode::ReadOnly => { - if available_fd < 96 { - rocksdb_options_destroy(options); - return Err(io::Error::new( - io::ErrorKind::Other, - format!( - "Oxigraph needs at least 96 file descriptors, \ - only {available_fd} allowed. \ - Run e.g. `ulimit -n 512` to allow 512 opened files" - ), - ) - .into()); - } - (available_fd - 48).try_into().unwrap() - } - OpeningMode::Secondary(_) => -1, - }; - rocksdb_options_set_max_open_files(options, max_open_files); - } - rocksdb_options_set_info_log_level(options, 2); // We only log warnings - rocksdb_options_set_max_log_file_size(options, 1024 * 1024); // Only 1MB log size - rocksdb_options_set_recycle_log_file_num(options, 10); // We do not keep more than 10 log files rocksdb_options_set_compression( options, - if open_mode == &OpeningMode::InMemory { + if in_memory { rocksdb_no_compression } else { rocksdb_lz4_compression @@ -303,14 +204,6 @@ impl Db { .try_into() .unwrap(), ); - rocksdb_options_set_env( - options, - if open_mode == &OpeningMode::InMemory { - ROCKSDB_MEM_ENV.0 - } else { - ROCKSDB_ENV.0 - }, - ); let block_based_table_options = rocksdb_block_based_options_create(); assert!( !block_based_table_options.is_null(), @@ -323,126 +216,33 @@ impl Db { ); rocksdb_options_set_block_based_table_factory(options, block_based_table_options); - if !column_families.iter().any(|c| c.name == "default") { - column_families.push(ColumnFamilyDefinition { - name: "default", - use_iter: true, - min_prefix_size: 0, - unordered_writes: false, - }) - } - let column_family_names = column_families.iter().map(|c| c.name).collect::>(); - let c_column_families = column_family_names - .iter() - .map(|name| CString::new(*name)) - .collect::, _>>() - .map_err(|e| StorageError::Other(Box::new(e)))?; - let cf_options = column_families - .into_iter() - .map(|cf| { - let options = rocksdb_options_create_copy(options); - if !cf.use_iter { - rocksdb_options_optimize_for_point_lookup(options, 128); - } - if cf.min_prefix_size > 0 { - rocksdb_options_set_prefix_extractor( - options, - rocksdb_slicetransform_create_fixed_prefix(cf.min_prefix_size), - ); - } - if cf.unordered_writes { - rocksdb_options_set_unordered_write(options, 1); - } - options - }) - .collect::>(); - + let (column_family_names, c_column_family_names, cf_options) = + Self::column_families_names_and_options(column_families, options); let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = vec![ptr::null_mut(); column_family_names.len()]; + let c_num_column_families = c_column_family_names.len().try_into().unwrap(); - let c_num_column_families = c_column_families.len().try_into().unwrap(); - let c_column_family_names = c_column_families - .iter() - .map(|cf| cf.as_ptr()) - .collect::>(); - - let db = match &open_mode { - OpeningMode::Primary | OpeningMode::InMemory => { - let transactiondb_options = rocksdb_transactiondb_options_create(); - assert!( - !transactiondb_options.is_null(), - "rocksdb_transactiondb_options_create returned null" - ); + let transactiondb_options = rocksdb_transactiondb_options_create(); + assert!( + !transactiondb_options.is_null(), + "rocksdb_transactiondb_options_create returned null" + ); - ffi_result!(rocksdb_transactiondb_open_column_families_with_status( - options, - transactiondb_options, - c_path.as_ptr(), - c_num_column_families, - c_column_family_names.as_ptr(), - cf_options.as_ptr() as *const *const rocksdb_options_t, - cf_handles.as_mut_ptr(), - )) - .map(|db| { - let transaction_options = rocksdb_transaction_options_create(); - assert!( - !transaction_options.is_null(), - "rocksdb_transaction_options_create returned null" - ); - rocksdb_transaction_options_set_set_snapshot(transaction_options, 1); - let primary_db_handler = InnerDbHandlerPrimary { - db, - transaction_options, - transactiondb_options, - path, - }; - match open_mode { - OpeningMode::InMemory => InnerDbHandler::InMemory(primary_db_handler), - OpeningMode::Primary => InnerDbHandler::Primary(primary_db_handler), - _ => unreachable!(), - } - }) - .map_err(|e| { - rocksdb_transactiondb_options_destroy(transactiondb_options); - e - }) - } - OpeningMode::Secondary(secondary_path) => { - ffi_result!(rocksdb_open_as_secondary_column_families_with_status( - options, - c_path.as_ptr(), - path_to_cstring(secondary_path)?.as_ptr(), - c_num_column_families, - c_column_family_names.as_ptr(), - cf_options.as_ptr() as *const *const rocksdb_options_t, - cf_handles.as_mut_ptr(), - )) - .map(|db| { - InnerDbHandler::Secondary(InnerDbHandlerSecondary { - db, - primary_path: path, - }) - }) - } - OpeningMode::ReadOnly => { - ffi_result!(rocksdb_open_for_read_only_column_families_with_status( - options, - c_path.as_ptr(), - c_num_column_families, - c_column_family_names.as_ptr(), - cf_options.as_ptr() as *const *const rocksdb_options_t, - cf_handles.as_mut_ptr(), - 0, // false - )) - .map(|db| { - InnerDbHandler::ReadOnly(InnerDbHandlerSecondary { - db, - primary_path: path, - }) - }) - } - } + let db = ffi_result!(rocksdb_transactiondb_open_column_families_with_status( + options, + transactiondb_options, + c_path.as_ptr(), + c_num_column_families, + c_column_family_names + .iter() + .map(|cf| cf.as_ptr()) + .collect::>() + .as_ptr(), + cf_options.as_ptr() as *const *const rocksdb_options_t, + cf_handles.as_mut_ptr(), + )) .map_err(|e| { + rocksdb_transactiondb_options_destroy(transactiondb_options); for cf_option in &cf_options { rocksdb_options_destroy(*cf_option); } @@ -469,10 +269,17 @@ impl Db { !write_options.is_null(), "rocksdb_writeoptions_create returned null" ); - if open_mode == &OpeningMode::InMemory { + if in_memory { rocksdb_writeoptions_disable_WAL(write_options, 1); // No need for WAL } + let transaction_options = rocksdb_transaction_options_create(); + assert!( + !transaction_options.is_null(), + "rocksdb_transaction_options_create returned null" + ); + rocksdb_transaction_options_set_set_snapshot(transaction_options, 1); + let flush_options = rocksdb_flushoptions_create(); assert!( !flush_options.is_null(), @@ -497,25 +304,249 @@ impl Db { "rocksdb_compactoptions_create returned null" ); - Ok(DbHandler { - db, + Ok(Self { + inner: DbKind::ReadWrite(Arc::new(RwDbHandler { + db, + options, + transaction_options, + transactiondb_options, + read_options, + write_options, + flush_options, + env_options, + ingest_external_file_options, + compaction_options, + block_based_table_options, + column_family_names, + cf_handles, + cf_options, + path, + in_memory, + })), + }) + } + } + + pub fn open_secondary( + primary_path: &Path, + secondary_path: &Path, + column_families: Vec, + ) -> Result { + unsafe { + let c_primary_path = path_to_cstring(primary_path)?; + let c_secondary_path = path_to_cstring(secondary_path)?; + let options = Self::db_options(false, false)?; + let (column_family_names, c_column_family_names, cf_options) = + Self::column_families_names_and_options(column_families, options); + let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = + vec![ptr::null_mut(); column_family_names.len()]; + let c_num_column_families = c_column_family_names.len().try_into().unwrap(); + let db = ffi_result!(rocksdb_open_as_secondary_column_families_with_status( options, - read_options, - write_options, - flush_options, - env_options, - ingest_external_file_options, - compaction_options, - block_based_table_options, - column_family_names, - cf_handles, - cf_options, + c_primary_path.as_ptr(), + c_secondary_path.as_ptr(), + c_num_column_families, + c_column_family_names + .iter() + .map(|cf| cf.as_ptr()) + .collect::>() + .as_ptr(), + cf_options.as_ptr() as *const *const rocksdb_options_t, + cf_handles.as_mut_ptr(), + )) + .map_err(|e| { + for cf_option in &cf_options { + rocksdb_options_destroy(*cf_option); + } + rocksdb_options_destroy(options); + e + })?; + assert!( + !db.is_null(), + "rocksdb_open_for_read_only_column_families_with_status returned null" + ); + for handle in &cf_handles { + assert!( + !handle.is_null(), + "rocksdb_open_for_read_only_column_families_with_status returned a null column family" + ); + } + let read_options = rocksdb_readoptions_create(); + assert!( + !read_options.is_null(), + "rocksdb_readoptions_create returned null" + ); + Ok(Self { + inner: DbKind::ReadOnly(Arc::new(RoDbHandler { + db, + options, + read_options, + column_family_names, + cf_handles, + cf_options, + is_secondary: true, + })), }) } } + pub fn open_read_only( + path: &Path, + column_families: Vec, + ) -> Result { + unsafe { + let c_path = path_to_cstring(path)?; + let options = Self::db_options(true, false)?; + let (column_family_names, c_column_family_names, cf_options) = + Self::column_families_names_and_options(column_families, options); + let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = + vec![ptr::null_mut(); column_family_names.len()]; + let c_num_column_families = c_column_family_names.len().try_into().unwrap(); + let db = ffi_result!(rocksdb_open_for_read_only_column_families_with_status( + options, + c_path.as_ptr(), + c_num_column_families, + c_column_family_names + .iter() + .map(|cf| cf.as_ptr()) + .collect::>() + .as_ptr(), + cf_options.as_ptr() as *const *const rocksdb_options_t, + cf_handles.as_mut_ptr(), + 0, // false + )) + .map_err(|e| { + for cf_option in &cf_options { + rocksdb_options_destroy(*cf_option); + } + rocksdb_options_destroy(options); + e + })?; + assert!( + !db.is_null(), + "rocksdb_open_for_read_only_column_families_with_status returned null" + ); + for handle in &cf_handles { + assert!( + !handle.is_null(), + "rocksdb_open_for_read_only_column_families_with_status returned a null column family" + ); + } + let read_options = rocksdb_readoptions_create(); + assert!( + !read_options.is_null(), + "rocksdb_readoptions_create returned null" + ); + + Ok(Self { + inner: DbKind::ReadOnly(Arc::new(RoDbHandler { + db, + options, + read_options, + column_family_names, + cf_handles, + cf_options, + is_secondary: false, + })), + }) + } + } + + fn db_options( + limit_max_open_files: bool, + in_memory: bool, + ) -> Result<*mut rocksdb_options_t, StorageError> { + unsafe { + let options = rocksdb_options_create(); + assert!(!options.is_null(), "rocksdb_options_create returned null"); + rocksdb_options_optimize_level_style_compaction(options, 512 * 1024 * 1024); + rocksdb_options_increase_parallelism( + options, + available_parallelism()?.get().try_into().unwrap(), + ); + if limit_max_open_files { + if let Some(available_fd) = available_file_descriptors()? { + if available_fd < 96 { + rocksdb_options_destroy(options); + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Oxigraph needs at least 96 file descriptors, \ + only {available_fd} allowed. \ + Run e.g. `ulimit -n 512` to allow 512 opened files" + ), + ) + .into()); + } + rocksdb_options_set_max_open_files( + options, + (available_fd - 48).try_into().unwrap(), + ) + } + } else { + rocksdb_options_set_max_open_files(options, -1); + } + rocksdb_options_set_info_log_level(options, 2); // We only log warnings + rocksdb_options_set_max_log_file_size(options, 1024 * 1024); // Only 1MB log size + rocksdb_options_set_recycle_log_file_num(options, 10); // We do not keep more than 10 log files + rocksdb_options_set_env( + options, + if in_memory { + ROCKSDB_MEM_ENV.0 + } else { + ROCKSDB_ENV.0 + }, + ); + Ok(options) + } + } + + fn column_families_names_and_options( + mut column_families: Vec, + base_options: *mut rocksdb_options_t, + ) -> (Vec<&'static str>, Vec, Vec<*mut rocksdb_options_t>) { + if !column_families.iter().any(|c| c.name == "default") { + column_families.push(ColumnFamilyDefinition { + name: "default", + use_iter: true, + min_prefix_size: 0, + unordered_writes: false, + }) + } + let column_family_names = column_families.iter().map(|c| c.name).collect::>(); + let c_column_family_names = column_family_names + .iter() + .map(|name| CString::new(*name).unwrap()) + .collect(); + + let cf_options = column_families + .into_iter() + .map(|cf| unsafe { + let options = rocksdb_options_create_copy(base_options); + if !cf.use_iter { + rocksdb_options_optimize_for_point_lookup(options, 128); + } + if cf.min_prefix_size > 0 { + rocksdb_options_set_prefix_extractor( + options, + rocksdb_slicetransform_create_fixed_prefix(cf.min_prefix_size), + ); + } + if cf.unordered_writes { + rocksdb_options_set_unordered_write(options, 1); + } + options + }) + .collect::>(); + (column_family_names, c_column_family_names, cf_options) + } + pub fn column_family(&self, name: &'static str) -> Option { - for (cf, cf_handle) in self.0.column_family_names.iter().zip(&self.0.cf_handles) { + let (column_family_names, cf_handles) = match &self.inner { + DbKind::ReadOnly(db) => (&db.column_family_names, &db.cf_handles), + DbKind::ReadWrite(db) => (&db.column_family_names, &db.cf_handles), + }; + for (cf, cf_handle) in column_family_names.iter().zip(cf_handles) { if *cf == name { return Some(ColumnFamily(*cf_handle)); } @@ -526,39 +557,35 @@ impl Db { #[must_use] pub fn snapshot(&self) -> Reader { unsafe { - let options = rocksdb_readoptions_create_copy(self.0.read_options); - match &self.0.db { - InnerDbHandler::InMemory(inner_db_handler) - | InnerDbHandler::Primary(inner_db_handler) => { - let snapshot = rocksdb_transactiondb_create_snapshot(inner_db_handler.db); + match &self.inner { + DbKind::ReadOnly(db) => { + if db.is_secondary { + // We try to refresh (and ignore the errors) + #[allow(clippy::let_underscore_must_use)] + let _ = ffi_result!(rocksdb_try_catch_up_with_primary_with_status(db.db)); + } + let options = rocksdb_readoptions_create_copy(db.read_options); + Reader { + inner: InnerReader::PlainDb(db.clone()), + options, + } + } + DbKind::ReadWrite(db) => { + let options = rocksdb_readoptions_create_copy(db.read_options); + let snapshot = rocksdb_transactiondb_create_snapshot(db.db); assert!( !snapshot.is_null(), "rocksdb_transactiondb_create_snapshot returned null" ); rocksdb_readoptions_set_snapshot(options, snapshot); Reader { - inner: InnerReader::Snapshot(Rc::new(InnerSnapshot { - db: self.0.clone(), + inner: InnerReader::TransactionalSnapshot(Rc::new(TransactionalSnapshot { + db: db.clone(), snapshot, })), options, } } - InnerDbHandler::ReadOnly(_) => Reader { - inner: InnerReader::Secondary(self.0.clone()), - options, - }, - InnerDbHandler::Secondary(_) => { - ffi_result!(rocksdb_try_catch_up_with_primary_with_status( - secondary_db_handler_or_error(&self.0.db).unwrap().db, - )) - .unwrap(); - - Reader { - inner: InnerReader::Secondary(self.0.clone()), - options, - } - } } } } @@ -567,71 +594,81 @@ impl Db { &'b self, f: impl Fn(Transaction<'a>) -> Result, ) -> Result { - loop { - let transaction = unsafe { - let db = primary_db_handler_or_error(&self.0.db)?; - let transaction = rocksdb_transaction_begin( - db.db, - self.0.write_options, - db.transaction_options, - ptr::null_mut(), - ); - assert!( - !transaction.is_null(), - "rocksdb_transaction_begin returned null" - ); - transaction - }; - let (read_options, snapshot) = unsafe { - let options = rocksdb_readoptions_create_copy(self.0.read_options); - let snapshot = rocksdb_transaction_get_snapshot(transaction); - rocksdb_readoptions_set_snapshot(options, snapshot); - (options, snapshot) - }; - let result = f(Transaction { - transaction: Rc::new(transaction), - read_options, - _lifetime: PhantomData::default(), - }); - match result { - Ok(result) => { - unsafe { - let r = ffi_result!(rocksdb_transaction_commit_with_status(transaction)); - rocksdb_transaction_destroy(transaction); - rocksdb_readoptions_destroy(read_options); - free(snapshot as *mut c_void); - r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails - } - return Ok(result); - } - Err(e) => { - unsafe { - let r = ffi_result!(rocksdb_transaction_rollback_with_status(transaction)); - rocksdb_transaction_destroy(transaction); - rocksdb_readoptions_destroy(read_options); - free(snapshot as *mut c_void); - r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails - } - // We look for the root error - let mut error: &(dyn Error + 'static) = &e; - while let Some(e) = error.source() { - error = e; + if let DbKind::ReadWrite(db) = &self.inner { + loop { + let transaction = unsafe { + let transaction = rocksdb_transaction_begin( + db.db, + db.write_options, + db.transaction_options, + ptr::null_mut(), + ); + assert!( + !transaction.is_null(), + "rocksdb_transaction_begin returned null" + ); + transaction + }; + let (read_options, snapshot) = unsafe { + let options = rocksdb_readoptions_create_copy(db.read_options); + let snapshot = rocksdb_transaction_get_snapshot(transaction); + rocksdb_readoptions_set_snapshot(options, snapshot); + (options, snapshot) + }; + let result = f(Transaction { + transaction: Rc::new(transaction), + read_options, + _lifetime: PhantomData::default(), + }); + match result { + Ok(result) => { + unsafe { + let r = + ffi_result!(rocksdb_transaction_commit_with_status(transaction)); + rocksdb_transaction_destroy(transaction); + rocksdb_readoptions_destroy(read_options); + free(snapshot as *mut c_void); + r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails + } + return Ok(result); } - let is_conflict_error = - error.downcast_ref::().map_or(false, |e| { - e.0.code == rocksdb_status_code_t_rocksdb_status_code_busy - || e.0.code == rocksdb_status_code_t_rocksdb_status_code_timed_out - || e.0.code == rocksdb_status_code_t_rocksdb_status_code_try_again - }); - if is_conflict_error { - // We give a chance to the OS to do something else before retrying in order to help avoiding another conflict - yield_now(); - } else { - // We raise the error - return Err(e); + Err(e) => { + unsafe { + let r = + ffi_result!(rocksdb_transaction_rollback_with_status(transaction)); + rocksdb_transaction_destroy(transaction); + rocksdb_readoptions_destroy(read_options); + free(snapshot as *mut c_void); + r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails + } + // We look for the root error + let mut error: &(dyn Error + 'static) = &e; + while let Some(e) = error.source() { + error = e; + } + let is_conflict_error = + error.downcast_ref::().map_or(false, |e| { + e.0.code == rocksdb_status_code_t_rocksdb_status_code_busy + || e.0.code + == rocksdb_status_code_t_rocksdb_status_code_timed_out + || e.0.code + == rocksdb_status_code_t_rocksdb_status_code_try_again + }); + if is_conflict_error { + // We give a chance to the OS to do something else before retrying in order to help avoiding another conflict + yield_now(); + } else { + // We raise the error + return Err(e); + } } } } + } else { + Err( + StorageError::Other("Transaction are only possible on read-write instances".into()) + .into(), + ) } } @@ -641,28 +678,26 @@ impl Db { key: &[u8], ) -> Result, StorageError> { unsafe { - let slice = match &self.0.db { - InnerDbHandler::Secondary(inner_db_handler) - | InnerDbHandler::ReadOnly(inner_db_handler) => { + let slice = match &self.inner { + DbKind::ReadOnly(db) => { ffi_result!(rocksdb_get_pinned_cf_with_status( - inner_db_handler.db, - self.0.read_options, + db.db, + db.read_options, column_family.0, key.as_ptr() as *const c_char, key.len(), - ))? + )) } - InnerDbHandler::Primary(inner_db_handler) - | InnerDbHandler::InMemory(inner_db_handler) => { + DbKind::ReadWrite(db) => { ffi_result!(rocksdb_transactiondb_get_pinned_cf_with_status( - inner_db_handler.db, - self.0.read_options, + db.db, + db.read_options, column_family.0, key.as_ptr() as *const c_char, key.len() - ))? + )) } - }; + }?; Ok(if slice.is_null() { None } else { @@ -685,60 +720,83 @@ impl Db { key: &[u8], value: &[u8], ) -> Result<(), StorageError> { - unsafe { - ffi_result!(rocksdb_transactiondb_put_cf_with_status( - primary_db_handler_or_error(&self.0.db)?.db, - self.0.write_options, - column_family.0, - key.as_ptr() as *const c_char, - key.len(), - value.as_ptr() as *const c_char, - value.len(), - ))?; + if let DbKind::ReadWrite(db) = &self.inner { + unsafe { + ffi_result!(rocksdb_transactiondb_put_cf_with_status( + db.db, + db.write_options, + column_family.0, + key.as_ptr() as *const c_char, + key.len(), + value.as_ptr() as *const c_char, + value.len(), + )) + }?; + Ok(()) + } else { + Err(StorageError::Other( + "Inserts are only possible on read-write instances".into(), + )) } - Ok(()) } pub fn flush(&self, column_family: &ColumnFamily) -> Result<(), StorageError> { - unsafe { - ffi_result!(rocksdb_transactiondb_flush_cf_with_status( - primary_db_handler_or_error(&self.0.db)?.db, - self.0.flush_options, - column_family.0, - ))?; + if let DbKind::ReadWrite(db) = &self.inner { + unsafe { + ffi_result!(rocksdb_transactiondb_flush_cf_with_status( + db.db, + db.flush_options, + column_family.0, + )) + }?; + Ok(()) + } else { + Err(StorageError::Other( + "Flush is only possible on read-write instances".into(), + )) } - Ok(()) } - #[allow(clippy::unnecessary_wraps)] pub fn compact(&self, column_family: &ColumnFamily) -> Result<(), StorageError> { - unsafe { - ffi_result!(rocksdb_transactiondb_compact_range_cf_opt_with_status( - primary_db_handler_or_error(&self.0.db)?.db, - column_family.0, - self.0.compaction_options, - ptr::null(), - 0, - ptr::null(), - 0, - ))?; + if let DbKind::ReadWrite(db) = &self.inner { + unsafe { + ffi_result!(rocksdb_transactiondb_compact_range_cf_opt_with_status( + db.db, + column_family.0, + db.compaction_options, + ptr::null(), + 0, + ptr::null(), + 0, + )) + }?; + Ok(()) + } else { + Err(StorageError::Other( + "Compaction is only possible on read-write instances".into(), + )) } - Ok(()) } pub fn new_sst_file(&self) -> Result { - unsafe { - let path = self.0.db.primary_path().join(random::().to_string()); - let writer = rocksdb_sstfilewriter_create(self.0.env_options, self.0.options); - ffi_result!(rocksdb_sstfilewriter_open_with_status( - writer, - path_to_cstring(&path)?.as_ptr() + if let DbKind::ReadWrite(db) = &self.inner { + let path = db.path.join(random::().to_string()); + unsafe { + let writer = rocksdb_sstfilewriter_create(db.env_options, db.options); + ffi_result!(rocksdb_sstfilewriter_open_with_status( + writer, + path_to_cstring(&path)?.as_ptr() + )) + .map_err(|e| { + rocksdb_sstfilewriter_destroy(writer); + e + })?; + Ok(SstFileWriter { writer, path }) + } + } else { + Err(StorageError::Other( + "SST creation is only possible on read-write instances".into(), )) - .map_err(|e| { - rocksdb_sstfilewriter_destroy(writer); - e - })?; - Ok(SstFileWriter { writer, path }) } } @@ -746,56 +804,66 @@ impl Db { &self, ssts_for_cf: &[(&ColumnFamily, PathBuf)], ) -> Result<(), StorageError> { - let mut paths_by_cf = HashMap::<_, Vec<_>>::new(); - for (cf, path) in ssts_for_cf { - paths_by_cf - .entry(*cf) - .or_default() - .push(path_to_cstring(path)?); - } - let cpaths_by_cf = paths_by_cf - .iter() - .map(|(cf, paths)| (*cf, paths.iter().map(|p| p.as_ptr()).collect::>())) - .collect::>(); - let args = cpaths_by_cf - .iter() - .map(|(cf, p)| rocksdb_ingestexternalfilearg_t { - column_family: cf.0, - external_files: p.as_ptr(), - external_files_len: p.len(), - options: self.0.ingest_external_file_options, - }) - .collect::>(); - unsafe { - ffi_result!(rocksdb_transactiondb_ingest_external_files_with_status( - primary_db_handler_or_error(&self.0.db)?.db, - args.as_ptr(), - args.len() - ))?; + if let DbKind::ReadWrite(db) = &self.inner { + let mut paths_by_cf = HashMap::<_, Vec<_>>::new(); + for (cf, path) in ssts_for_cf { + paths_by_cf + .entry(*cf) + .or_default() + .push(path_to_cstring(path)?); + } + let cpaths_by_cf = paths_by_cf + .iter() + .map(|(cf, paths)| (*cf, paths.iter().map(|p| p.as_ptr()).collect::>())) + .collect::>(); + let args = cpaths_by_cf + .iter() + .map(|(cf, p)| rocksdb_ingestexternalfilearg_t { + column_family: cf.0, + external_files: p.as_ptr(), + external_files_len: p.len(), + options: db.ingest_external_file_options, + }) + .collect::>(); + unsafe { + ffi_result!(rocksdb_transactiondb_ingest_external_files_with_status( + db.db, + args.as_ptr(), + args.len() + ))?; + } + Ok(()) + } else { + Err(StorageError::Other( + "SST ingestion is only possible on read-write instances".into(), + )) } - Ok(()) } pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> { - match &self.0.db { - InnerDbHandler::Secondary(_) | InnerDbHandler::ReadOnly(_) => Err(StorageError::Other( - "It is not possible to backup an database opened as secondary or read-only.".into(), - )), - InnerDbHandler::InMemory(_) => Err(StorageError::Other( - "It is not possible to backup an in-memory database created with `Store::new`" - .into(), - )), - InnerDbHandler::Primary(db) => { - let path = path_to_cstring(target_directory)?; + let path = path_to_cstring(target_directory)?; + match &self.inner { + DbKind::ReadOnly(db) => unsafe { + if db.is_secondary { + ffi_result!(rocksdb_try_catch_up_with_primary_with_status(db.db))?; + } + ffi_result!(rocksdb_create_checkpoint_with_status(db.db, path.as_ptr())) + }, + DbKind::ReadWrite(db) => { + if db.in_memory { + return Err(StorageError::Other( + "It is not possible to backup an in-memory database".into(), + )); + } unsafe { ffi_result!(rocksdb_transactiondb_create_checkpoint_with_status( db.db, path.as_ptr() - ))?; + )) } - Ok(()) } - } + }?; + Ok(()) } } @@ -814,26 +882,19 @@ pub struct Reader { #[derive(Clone)] enum InnerReader { - Snapshot(Rc), + TransactionalSnapshot(Rc), Transaction(Weak<*mut rocksdb_transaction_t>), - Secondary(Arc), // TODO: we should not hold the whole enum, because we have to use - // secondary_db_handler_or_error to access rocksdb_t. we should - // safe it directly?! + PlainDb(Arc), } -struct InnerSnapshot { - db: Arc, // TODO: probably the same as InnerReader::Secondary +struct TransactionalSnapshot { + db: Arc, snapshot: *const rocksdb_snapshot_t, } -impl Drop for InnerSnapshot { +impl Drop for TransactionalSnapshot { fn drop(&mut self) { - unsafe { - rocksdb_transactiondb_release_snapshot( - primary_db_handler_or_error(&self.db.db).unwrap().db, - self.snapshot, - ) - } + unsafe { rocksdb_transactiondb_release_snapshot(self.db.db, self.snapshot) } } } @@ -860,9 +921,9 @@ impl Reader { ) -> Result, StorageError> { unsafe { let slice = match &self.inner { - InnerReader::Snapshot(inner) => { + InnerReader::TransactionalSnapshot(inner) => { ffi_result!(rocksdb_transactiondb_get_pinned_cf_with_status( - primary_db_handler_or_error(&inner.db.db)?.db, + inner.db.db, self.options, column_family.0, key.as_ptr() as *const c_char, @@ -884,9 +945,9 @@ impl Reader { )); } } - InnerReader::Secondary(inner) => { + InnerReader::PlainDb(inner) => { ffi_result!(rocksdb_get_pinned_cf_with_status( - secondary_db_handler_or_error(&inner.db)?.db, + inner.db, self.options, column_family.0, key.as_ptr() as *const c_char, @@ -951,11 +1012,9 @@ impl Reader { ); } let iter = match &self.inner { - InnerReader::Snapshot(inner) => rocksdb_transactiondb_create_iterator_cf( - primary_db_handler_or_error(&inner.db.db)?.db, - options, - column_family.0, - ), + InnerReader::TransactionalSnapshot(inner) => { + rocksdb_transactiondb_create_iterator_cf(inner.db.db, options, column_family.0) + } InnerReader::Transaction(inner) => { if let Some(inner) = inner.upgrade() { rocksdb_transaction_create_iterator_cf(*inner, options, column_family.0) @@ -965,9 +1024,8 @@ impl Reader { )); } } - InnerReader::Secondary(inner) => { - let db = secondary_db_handler_or_error(&inner.db)?.db; - rocksdb_create_iterator_cf(db, options, column_family.0) + InnerReader::PlainDb(inner) => { + rocksdb_create_iterator_cf(inner.db, options, column_family.0) } }; assert!(!iter.is_null(), "rocksdb_create_iterator returned null"); diff --git a/lib/tests/store.rs b/lib/tests/store.rs index 8ab6eead..0f696617 100644 --- a/lib/tests/store.rs +++ b/lib/tests/store.rs @@ -312,17 +312,34 @@ fn test_backup() -> Result<(), Box> { GraphNameRef::DefaultGraph, ); let store_dir = TempDir::default(); - let backup_dir = TempDir::default(); + let secondary_store_dir = TempDir::default(); + let backup_from_rw_dir = TempDir::default(); + let backup_from_ro_dir = TempDir::default(); + let backup_from_secondary_dir = TempDir::default(); let store = Store::open(&store_dir)?; store.insert(quad)?; - store.backup(&backup_dir)?; - store.remove(quad)?; + let secondary_store = Store::open_secondary(&store_dir, secondary_store_dir)?; + store.flush()?; + store.backup(&backup_from_rw_dir)?; + secondary_store.backup(&backup_from_secondary_dir)?; + store.remove(quad)?; assert!(!store.contains(quad)?); - let backup = Store::open(&backup_dir.0)?; - backup.validate()?; - assert!(backup.contains(quad)?); + + let backup_from_rw = Store::open_read_only(&backup_from_rw_dir.0)?; + backup_from_rw.validate()?; + assert!(backup_from_rw.contains(quad)?); + backup_from_rw.backup(&backup_from_ro_dir)?; + + let backup_from_ro = Store::open_read_only(&backup_from_ro_dir.0)?; + backup_from_ro.validate()?; + assert!(backup_from_ro.contains(quad)?); + + let backup_from_secondary = Store::open_read_only(&backup_from_secondary_dir.0)?; + backup_from_secondary.validate()?; + assert!(backup_from_secondary.contains(quad)?); + Ok(()) } diff --git a/oxrocksdb-sys/api/c.cc b/oxrocksdb-sys/api/c.cc index 95be839d..49c5e55a 100644 --- a/oxrocksdb-sys/api/c.cc +++ b/oxrocksdb-sys/api/c.cc @@ -106,6 +106,20 @@ rocksdb_t* rocksdb_open_as_secondary_column_families_with_status( return result; } +void rocksdb_create_checkpoint_with_status(rocksdb_t* db, + const char* checkpoint_dir, + rocksdb_status_t* statusptr) { + Checkpoint* checkpoint; + Status s = Checkpoint::Create(db->rep, &checkpoint); + if (!s.ok()) { + SaveStatus(statusptr, s); + return; + } + SaveStatus(statusptr, + checkpoint->CreateCheckpoint(std::string(checkpoint_dir))); + delete checkpoint; +} + rocksdb_transactiondb_t* rocksdb_transactiondb_open_column_families_with_status( const rocksdb_options_t* options, const rocksdb_transactiondb_options_t* txn_db_options, const char* name, diff --git a/oxrocksdb-sys/api/c.h b/oxrocksdb-sys/api/c.h index 677e1d66..0200b91e 100644 --- a/oxrocksdb-sys/api/c.h +++ b/oxrocksdb-sys/api/c.h @@ -90,6 +90,9 @@ rocksdb_open_as_secondary_column_families_with_status( rocksdb_column_family_handle_t** column_family_handles, rocksdb_status_t* statusptr); +extern ROCKSDB_LIBRARY_API void rocksdb_create_checkpoint_with_status( + rocksdb_t* db, const char* checkpoint_dir, rocksdb_status_t* statusptr); + extern ROCKSDB_LIBRARY_API rocksdb_transactiondb_t* rocksdb_transactiondb_open_column_families_with_status( const rocksdb_options_t* options, diff --git a/server/src/main.rs b/server/src/main.rs index 25592090..9ae53bba 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -56,8 +56,9 @@ enum Command { }, /// Start Oxigraph HTTP server in read-only mode. /// - /// It allows to read the database while other processes are accessing it. - /// Changes done after this process has been launched will not be seen. + /// It allows to read the database while other processes are also reading it. + /// Opening as read-only while having an other process writing the database is undefined behavior. + /// Please use the serve-secondary command in this case. ServeReadOnly { /// Directory in which the data stored by Oxigraph are persisted. #[arg(short, long)]