RocksDB: uses multi-columns flush

pull/779/merge
Tpt 10 months ago committed by Thomas Tanon
parent efae84b5f8
commit c277804026
  1. 17
      lib/oxigraph/src/storage/backend/rocksdb.rs
  2. 14
      lib/oxigraph/src/storage/mod.rs
  3. 10
      oxrocksdb-sys/api/c.cc
  4. 5
      oxrocksdb-sys/api/c.h

@ -9,7 +9,7 @@
)] )]
use crate::storage::error::{CorruptionError, StorageError}; use crate::storage::error::{CorruptionError, StorageError};
use libc::{self, c_void, free}; use libc::{self, c_void};
use oxrocksdb_sys::*; use oxrocksdb_sys::*;
use rand::random; use rand::random;
use std::borrow::Borrow; use std::borrow::Borrow;
@ -625,7 +625,7 @@ impl Db {
ffi_result!(rocksdb_transaction_commit_with_status(transaction)); ffi_result!(rocksdb_transaction_commit_with_status(transaction));
rocksdb_transaction_destroy(transaction); rocksdb_transaction_destroy(transaction);
rocksdb_readoptions_destroy(read_options); rocksdb_readoptions_destroy(read_options);
free(snapshot as *mut c_void); rocksdb_free(snapshot as *mut c_void);
r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails
} }
return Ok(result); return Ok(result);
@ -636,7 +636,7 @@ impl Db {
ffi_result!(rocksdb_transaction_rollback_with_status(transaction)); ffi_result!(rocksdb_transaction_rollback_with_status(transaction));
rocksdb_transaction_destroy(transaction); rocksdb_transaction_destroy(transaction);
rocksdb_readoptions_destroy(read_options); rocksdb_readoptions_destroy(read_options);
free(snapshot as *mut c_void); rocksdb_free(snapshot as *mut c_void);
r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails r.map_err(StorageError::from)?; // We make sure to also run destructors if the commit fails
} }
// We look for the root error // We look for the root error
@ -738,13 +738,14 @@ impl Db {
} }
} }
pub fn flush(&self, column_family: &ColumnFamily) -> Result<(), StorageError> { pub fn flush(&self) -> Result<(), StorageError> {
if let DbKind::ReadWrite(db) = &self.inner { if let DbKind::ReadWrite(db) = &self.inner {
unsafe { unsafe {
ffi_result!(rocksdb_transactiondb_flush_cf_with_status( ffi_result!(rocksdb_transactiondb_flush_cfs_with_status(
db.db, db.db,
db.flush_options, db.flush_options,
column_family.0, db.cf_handles.as_ptr().cast_mut(),
db.cf_handles.len().try_into().unwrap()
)) ))
}?; }?;
Ok(()) Ok(())
@ -1190,7 +1191,7 @@ pub struct Buffer {
impl Drop for Buffer { impl Drop for Buffer {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe {
free(self.base.cast()); rocksdb_free(self.base.cast());
} }
} }
} }
@ -1325,7 +1326,7 @@ impl Drop for ErrorStatus {
fn drop(&mut self) { fn drop(&mut self) {
if !self.0.string.is_null() { if !self.0.string.is_null() {
unsafe { unsafe {
free(self.0.string as *mut c_void); rocksdb_free(self.0.string as *mut c_void);
} }
} }
} }

@ -258,7 +258,7 @@ impl Storage {
fn update_version(&self, version: u64) -> Result<(), StorageError> { fn update_version(&self, version: u64) -> Result<(), StorageError> {
self.db self.db
.insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?; .insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?;
self.db.flush(&self.default_cf) self.db.flush()
} }
pub fn snapshot(&self) -> StorageReader { pub fn snapshot(&self) -> StorageReader {
@ -283,17 +283,7 @@ impl Storage {
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]
pub fn flush(&self) -> Result<(), StorageError> { pub fn flush(&self) -> Result<(), StorageError> {
self.db.flush(&self.default_cf)?; self.db.flush()
self.db.flush(&self.gspo_cf)?;
self.db.flush(&self.gpos_cf)?;
self.db.flush(&self.gosp_cf)?;
self.db.flush(&self.spog_cf)?;
self.db.flush(&self.posg_cf)?;
self.db.flush(&self.ospg_cf)?;
self.db.flush(&self.dspo_cf)?;
self.db.flush(&self.dpos_cf)?;
self.db.flush(&self.dosp_cf)?;
self.db.flush(&self.id2str_cf)
} }
#[cfg(not(target_family = "wasm"))] #[cfg(not(target_family = "wasm"))]

@ -272,11 +272,15 @@ void rocksdb_transactiondb_put_cf_with_status(
Slice(key, keylen), Slice(val, vallen))); Slice(key, keylen), Slice(val, vallen)));
} }
void rocksdb_transactiondb_flush_cf_with_status( void rocksdb_transactiondb_flush_cfs_with_status(
rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options,
rocksdb_column_family_handle_t* column_family, rocksdb_column_family_handle_t** column_families, int num_column_families,
rocksdb_status_t* statusptr) { rocksdb_status_t* statusptr) {
SaveStatus(statusptr, db->rep->Flush(options->rep, column_family->rep)); vector<ColumnFamilyHandle*> column_family_handles(num_column_families);
for (int i = 0; i < num_column_families; i++) {
column_family_handles[i] = column_families[i]->rep;
}
SaveStatus(statusptr, db->rep->Flush(options->rep, column_family_handles));
} }
void rocksdb_transactiondb_compact_range_cf_opt_with_status( void rocksdb_transactiondb_compact_range_cf_opt_with_status(

@ -114,9 +114,10 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_put_cf_with_status(
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, const char* val, size_t vallen, rocksdb_status_t* statusptr); size_t keylen, const char* val, size_t vallen, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf_with_status( extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cfs_with_status(
rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options,
rocksdb_column_family_handle_t* column_family, rocksdb_status_t* statusptr); rocksdb_column_family_handle_t** column_families, int num_column_families,
rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void extern ROCKSDB_LIBRARY_API void
rocksdb_transactiondb_compact_range_cf_opt_with_status( rocksdb_transactiondb_compact_range_cf_opt_with_status(

Loading…
Cancel
Save