From c277804026180ac1bab695be633857dd3a7bfb8a Mon Sep 17 00:00:00 2001 From: Tpt Date: Thu, 15 Feb 2024 21:57:46 +0100 Subject: [PATCH] RocksDB: uses multi-columns flush --- lib/oxigraph/src/storage/backend/rocksdb.rs | 17 +++++++++-------- lib/oxigraph/src/storage/mod.rs | 14 ++------------ oxrocksdb-sys/api/c.cc | 10 +++++++--- oxrocksdb-sys/api/c.h | 5 +++-- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/lib/oxigraph/src/storage/backend/rocksdb.rs b/lib/oxigraph/src/storage/backend/rocksdb.rs index 14c45d0a..6f5561f4 100644 --- a/lib/oxigraph/src/storage/backend/rocksdb.rs +++ b/lib/oxigraph/src/storage/backend/rocksdb.rs @@ -9,7 +9,7 @@ )] use crate::storage::error::{CorruptionError, StorageError}; -use libc::{self, c_void, free}; +use libc::{self, c_void}; use oxrocksdb_sys::*; use rand::random; use std::borrow::Borrow; @@ -625,7 +625,7 @@ impl Db { ffi_result!(rocksdb_transaction_commit_with_status(transaction)); rocksdb_transaction_destroy(transaction); rocksdb_readoptions_destroy(read_options); - free(snapshot as *mut c_void); + rocksdb_free(snapshot as *mut c_void); r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails } return Ok(result); @@ -636,7 +636,7 @@ impl Db { ffi_result!(rocksdb_transaction_rollback_with_status(transaction)); rocksdb_transaction_destroy(transaction); rocksdb_readoptions_destroy(read_options); - free(snapshot as *mut c_void); + rocksdb_free(snapshot as *mut c_void); r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails } // We look for the root error @@ -738,13 +738,14 @@ impl Db { } } - pub fn flush(&self, column_family: &ColumnFamily) -> Result<(), StorageError> { + pub fn flush(&self) -> Result<(), StorageError> { if let DbKind::ReadWrite(db) = &self.inner { unsafe { - ffi_result!(rocksdb_transactiondb_flush_cf_with_status( + ffi_result!(rocksdb_transactiondb_flush_cfs_with_status( db.db, db.flush_options, - column_family.0, + db.cf_handles.as_ptr().cast_mut(), + db.cf_handles.len().try_into().unwrap() )) }?; Ok(()) @@ -1190,7 +1191,7 @@ pub struct Buffer { impl Drop for Buffer { fn drop(&mut self) { unsafe { - free(self.base.cast()); + rocksdb_free(self.base.cast()); } } } @@ -1325,7 +1326,7 @@ impl Drop for ErrorStatus { fn drop(&mut self) { if !self.0.string.is_null() { unsafe { - free(self.0.string as *mut c_void); + rocksdb_free(self.0.string as *mut c_void); } } } diff --git a/lib/oxigraph/src/storage/mod.rs b/lib/oxigraph/src/storage/mod.rs index 1f77d1b0..10d26c32 100644 --- a/lib/oxigraph/src/storage/mod.rs +++ b/lib/oxigraph/src/storage/mod.rs @@ -258,7 +258,7 @@ impl Storage { fn update_version(&self, version: u64) -> Result<(), StorageError> { self.db .insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?; - self.db.flush(&self.default_cf) + self.db.flush() } pub fn snapshot(&self) -> StorageReader { @@ -283,17 +283,7 @@ impl Storage { #[cfg(not(target_family = "wasm"))] pub fn flush(&self) -> Result<(), StorageError> { - self.db.flush(&self.default_cf)?; - self.db.flush(&self.gspo_cf)?; - self.db.flush(&self.gpos_cf)?; - self.db.flush(&self.gosp_cf)?; - self.db.flush(&self.spog_cf)?; - self.db.flush(&self.posg_cf)?; - self.db.flush(&self.ospg_cf)?; - self.db.flush(&self.dspo_cf)?; - self.db.flush(&self.dpos_cf)?; - self.db.flush(&self.dosp_cf)?; - self.db.flush(&self.id2str_cf) + self.db.flush() } #[cfg(not(target_family = "wasm"))] diff --git a/oxrocksdb-sys/api/c.cc b/oxrocksdb-sys/api/c.cc index 9a948ea2..b62ee7ff 100644 --- a/oxrocksdb-sys/api/c.cc +++ b/oxrocksdb-sys/api/c.cc @@ -272,11 +272,15 @@ void rocksdb_transactiondb_put_cf_with_status( Slice(key, keylen), Slice(val, vallen))); } -void rocksdb_transactiondb_flush_cf_with_status( +void rocksdb_transactiondb_flush_cfs_with_status( rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, - rocksdb_column_family_handle_t* column_family, + rocksdb_column_family_handle_t** column_families, int num_column_families, rocksdb_status_t* statusptr) { - SaveStatus(statusptr, db->rep->Flush(options->rep, column_family->rep)); + vector column_family_handles(num_column_families); + for (int i = 0; i < num_column_families; i++) { + column_family_handles[i] = column_families[i]->rep; + } + SaveStatus(statusptr, db->rep->Flush(options->rep, column_family_handles)); } void rocksdb_transactiondb_compact_range_cf_opt_with_status( diff --git a/oxrocksdb-sys/api/c.h b/oxrocksdb-sys/api/c.h index 022e8f97..8b7c173e 100644 --- a/oxrocksdb-sys/api/c.h +++ b/oxrocksdb-sys/api/c.h @@ -114,9 +114,10 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_put_cf_with_status( rocksdb_column_family_handle_t* column_family, const char* key, size_t keylen, const char* val, size_t vallen, rocksdb_status_t* statusptr); -extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf_with_status( +extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cfs_with_status( rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, - rocksdb_column_family_handle_t* column_family, rocksdb_status_t* statusptr); + rocksdb_column_family_handle_t** column_families, int num_column_families, + rocksdb_status_t* statusptr); extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_compact_range_cf_opt_with_status(