From dfefe6cd1c58d6779f020a26e6d1ae99a74ec88e Mon Sep 17 00:00:00 2001 From: Tpt Date: Mon, 25 Oct 2021 21:45:47 +0200 Subject: [PATCH] Uses transactional RocksDB First stop to real transaction support --- lib/src/storage/fallback_backend.rs | 10 ----- lib/src/storage/mod.rs | 38 +++++++--------- lib/src/storage/rocksdb_backend.rs | 68 ++++++++++++----------------- python/src/store.rs | 8 ++-- rocksdb-sys/api/c.cc | 23 +++++++++- rocksdb-sys/api/c.h | 16 ++++++- 6 files changed, 85 insertions(+), 78 deletions(-) diff --git a/lib/src/storage/fallback_backend.rs b/lib/src/storage/fallback_backend.rs index 3b90a8e3..626dc001 100644 --- a/lib/src/storage/fallback_backend.rs +++ b/lib/src/storage/fallback_backend.rs @@ -76,16 +76,6 @@ impl Db { .is_some()) } - pub fn clear(&self, column_family: &ColumnFamily) -> Result<()> { - Ok(self - .0 - .write() - .unwrap() - .get_mut(column_family) - .unwrap() - .clear()) - } - pub fn iter(&self, column_family: &ColumnFamily) -> Iter { self.scan_prefix(column_family, &[]) } diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index ac76505d..ff8af110 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -624,14 +624,17 @@ impl Storage { } pub fn remove_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> std::io::Result { - let graph_name = graph_name.into(); - for quad in self.quads_for_graph(&graph_name) { + self.remove_encoded_named_graph(&graph_name.into()) + } + + fn remove_encoded_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result { + for quad in self.quads_for_graph(graph_name) { self.remove_encoded(&quad?)?; } - let encoded_graph = encode_term(&graph_name); + let encoded_graph = encode_term(graph_name); Ok(if self.db.contains_key(&self.graphs_cf, &encoded_graph)? { self.db.remove(&self.graphs_cf, &encoded_graph)?; - self.remove_term(&graph_name)?; + self.remove_term(graph_name)?; true } else { false @@ -639,28 +642,19 @@ impl Storage { } pub fn remove_all_named_graphs(&self) -> std::io::Result<()> { - self.db.clear(&self.gspo_cf)?; - self.db.clear(&self.gpos_cf)?; - self.db.clear(&self.gosp_cf)?; - self.db.clear(&self.spog_cf)?; - self.db.clear(&self.posg_cf)?; - self.db.clear(&self.ospg_cf)?; - self.db.clear(&self.graphs_cf)?; + for graph_name in self.named_graphs() { + self.remove_encoded_named_graph(&graph_name?)?; + } Ok(()) } pub fn clear(&self) -> std::io::Result<()> { - self.db.clear(&self.dspo_cf)?; - self.db.clear(&self.dpos_cf)?; - self.db.clear(&self.dosp_cf)?; - self.db.clear(&self.gspo_cf)?; - self.db.clear(&self.gpos_cf)?; - self.db.clear(&self.gosp_cf)?; - self.db.clear(&self.spog_cf)?; - self.db.clear(&self.posg_cf)?; - self.db.clear(&self.ospg_cf)?; - self.db.clear(&self.graphs_cf)?; - self.db.clear(&self.id2str_cf)?; + for graph_name in self.named_graphs() { + self.remove_encoded_named_graph(&graph_name?)?; + } + for quad in self.quads() { + self.remove_encoded(&quad?)?; + } Ok(()) } diff --git a/lib/src/storage/rocksdb_backend.rs b/lib/src/storage/rocksdb_backend.rs index 88a625fa..86a948a5 100644 --- a/lib/src/storage/rocksdb_backend.rs +++ b/lib/src/storage/rocksdb_backend.rs @@ -44,8 +44,9 @@ unsafe impl Send for Db {} unsafe impl Sync for Db {} struct DbHandler { - db: *mut rocksdb_t, + db: *mut rocksdb_transactiondb_t, options: *mut rocksdb_options_t, + txn_options: *mut rocksdb_transactiondb_options_t, env: Option<*mut rocksdb_env_t>, column_families: Vec<&'static str>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>, @@ -54,7 +55,11 @@ struct DbHandler { impl Drop for DbHandler { fn drop(&mut self) { unsafe { - rocksdb_close(self.db); + for cf_handle in &self.cf_handles { + rocksdb_column_family_handle_destroy(*cf_handle); + } + rocksdb_transactiondb_close(self.db); + rocksdb_transactiondb_options_destroy(self.txn_options); rocksdb_options_destroy(self.options); if let Some(env) = self.env { rocksdb_env_destroy(env); @@ -94,10 +99,18 @@ impl Db { assert!(!options.is_null(), "rocksdb_options_create returned null"); rocksdb_options_set_create_if_missing(options, 1); rocksdb_options_set_create_missing_column_families(options, 1); + + let txn_options = rocksdb_transactiondb_options_create(); + assert!( + !txn_options.is_null(), + "rocksdb_transactiondb_options_create returned null" + ); + let env = if in_memory { let env = rocksdb_create_mem_env(); if env.is_null() { rocksdb_options_destroy(options); + rocksdb_transactiondb_options_destroy(txn_options); return Err(other_error("Not able to create an in-memory environment.")); } rocksdb_options_set_env(options, env); @@ -115,19 +128,13 @@ impl Db { .map(|cf| CString::new(*cf)) .collect::, _>>() .map_err(invalid_input_error)?; - let cf_options = column_families - .iter() - .map(|_| { - let options: *const rocksdb_options_t = rocksdb_options_create(); - assert!(!options.is_null(), "rocksdb_options_create returned null"); - options - }) - .collect::>(); + let cf_options: Vec<*const rocksdb_options_t> = vec![options; column_families.len()]; let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = vec![ptr::null_mut(); column_families.len()]; - let db = ffi_result!(rocksdb_open_column_families( + let db = ffi_result!(rocksdb_transactiondb_open_column_families( options, + txn_options, c_path.as_ptr(), column_families.len().try_into().unwrap(), c_column_families @@ -140,6 +147,7 @@ impl Db { )) .map_err(|e| { rocksdb_options_destroy(options); + rocksdb_transactiondb_options_destroy(txn_options); if let Some(env) = env { rocksdb_env_destroy(env); } @@ -148,8 +156,9 @@ impl Db { assert!(!db.is_null(), "rocksdb_create returned null"); for handle in &cf_handles { if handle.is_null() { - rocksdb_close(db); + rocksdb_transactiondb_close(db); rocksdb_options_destroy(options); + rocksdb_transactiondb_options_destroy(txn_options); if let Some(env) = env { rocksdb_env_destroy(env); } @@ -162,6 +171,7 @@ impl Db { Ok(DbHandler { db, options, + txn_options, env, column_families, cf_handles, @@ -185,7 +195,7 @@ impl Db { !options.is_null(), "rocksdb_flushoptions_create returned null" ); - let r = ffi_result!(rocksdb_flush(self.0.db, options)); + let r = ffi_result!(rocksdb_transactiondb_flush(self.0.db, options)); rocksdb_flushoptions_destroy(options); r } @@ -202,7 +212,7 @@ impl Db { !options.is_null(), "rocksdb_readoptions_create returned null" ); - let r = ffi_result!(rocksdb_get_pinned_cf( + let r = ffi_result!(rocksdb_transactiondb_get_pinned_cf( self.0.db, options, column_family.0, @@ -233,7 +243,7 @@ impl Db { !options.is_null(), "rocksdb_writeoptions_create returned null" ); - let r = ffi_result!(rocksdb_put_cf( + let r = ffi_result!(rocksdb_transactiondb_put_cf( self.0.db, options, column_family.0, @@ -258,7 +268,7 @@ impl Db { !options.is_null(), "rocksdb_writeoptions_create returned null" ); - let r = ffi_result!(rocksdb_delete_cf( + let r = ffi_result!(rocksdb_transactiondb_delete_cf( self.0.db, options, column_family.0, @@ -270,29 +280,6 @@ impl Db { } } - pub fn clear(&self, column_family: &ColumnFamily) -> Result<()> { - unsafe { - let options = rocksdb_writeoptions_create(); - assert!( - !options.is_null(), - "rocksdb_writeoptions_create returned null" - ); - let start = []; - let end = [c_char::MAX; 257]; - let r = ffi_result!(rocksdb_delete_range_cf( - self.0.db, - options, - column_family.0, - start.as_ptr(), - start.len(), - end.as_ptr(), - end.len(), - )); - rocksdb_writeoptions_destroy(options); - r - } - } - pub fn iter(&self, column_family: &ColumnFamily) -> Iter { self.scan_prefix(column_family, &[]) } @@ -304,7 +291,8 @@ impl Db { !options.is_null(), "rocksdb_readoptions_create returned null" ); - let iter = rocksdb_create_iterator_cf(self.0.db, options, column_family.0); + let iter = + rocksdb_transactiondb_create_iterator_cf(self.0.db, options, column_family.0); assert!(!options.is_null(), "rocksdb_create_iterator returned null"); if prefix.is_empty() { rocksdb_iter_seek_to_first(iter); diff --git a/python/src/store.rs b/python/src/store.rs index 8770f63c..5c7f5ddb 100644 --- a/python/src/store.rs +++ b/python/src/store.rs @@ -433,15 +433,15 @@ impl PyObjectProtocol for PyStore { self.inner.to_string() } - fn __bool__(&self) -> bool { - !self.inner.is_empty() + fn __bool__(&self) -> PyResult { + Ok(!self.inner.is_empty()?) } } #[pyproto] impl PySequenceProtocol for PyStore { - fn __len__(&self) -> usize { - self.inner.len() + fn __len__(&self) -> PyResult { + Ok(self.inner.len()?) } fn __contains__(&self, quad: PyQuad) -> PyResult { diff --git a/rocksdb-sys/api/c.cc b/rocksdb-sys/api/c.cc index 92d67086..0c51fce3 100644 --- a/rocksdb-sys/api/c.cc +++ b/rocksdb-sys/api/c.cc @@ -1,9 +1,30 @@ #include "../rocksdb/db/c.cc" #include "c.h" +extern "C" { + +rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf( + rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options, + rocksdb_column_family_handle_t* column_family, const char* key, + size_t keylen, char** errptr) { + rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t); + Status s = db->rep->Get(options->rep, column_family->rep, Slice(key, keylen), + &v->rep); + if (!s.ok()) { + delete v; + if (!s.IsNotFound()) { + SaveError(errptr, s); + } + return nullptr; + } + return v; +} + void rocksdb_transactiondb_flush( - rocksdb_t* db, + rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, char** errptr) { SaveError(errptr, db->rep->Flush(options->rep)); } + +} diff --git a/rocksdb-sys/api/c.h b/rocksdb-sys/api/c.h index 2c19738f..b97b760a 100644 --- a/rocksdb-sys/api/c.h +++ b/rocksdb-sys/api/c.h @@ -2,5 +2,19 @@ #include "../rocksdb/include/rocksdb/c.h" +#ifdef __cplusplus +extern "C" { +#endif + +extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf( + rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options, + rocksdb_column_family_handle_t* column_family, const char* key, + size_t keylen, char** errptr); + + extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush( - rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, char** errptr); \ No newline at end of file + rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, char** errptr); + +#ifdef __cplusplus +} +#endif