Uses transactional RocksDB

First stop to real transaction support
pull/171/head
Tpt 3 years ago
parent 3fd0332e32
commit dfefe6cd1c
  1. 10
      lib/src/storage/fallback_backend.rs
  2. 38
      lib/src/storage/mod.rs
  3. 68
      lib/src/storage/rocksdb_backend.rs
  4. 8
      python/src/store.rs
  5. 23
      rocksdb-sys/api/c.cc
  6. 16
      rocksdb-sys/api/c.h

@ -76,16 +76,6 @@ impl Db {
.is_some())
}
pub fn clear(&self, column_family: &ColumnFamily) -> Result<()> {
Ok(self
.0
.write()
.unwrap()
.get_mut(column_family)
.unwrap()
.clear())
}
pub fn iter(&self, column_family: &ColumnFamily) -> Iter {
self.scan_prefix(column_family, &[])
}

@ -624,14 +624,17 @@ impl Storage {
}
pub fn remove_named_graph(&self, graph_name: NamedOrBlankNodeRef<'_>) -> std::io::Result<bool> {
let graph_name = graph_name.into();
for quad in self.quads_for_graph(&graph_name) {
self.remove_encoded_named_graph(&graph_name.into())
}
fn remove_encoded_named_graph(&self, graph_name: &EncodedTerm) -> std::io::Result<bool> {
for quad in self.quads_for_graph(graph_name) {
self.remove_encoded(&quad?)?;
}
let encoded_graph = encode_term(&graph_name);
let encoded_graph = encode_term(graph_name);
Ok(if self.db.contains_key(&self.graphs_cf, &encoded_graph)? {
self.db.remove(&self.graphs_cf, &encoded_graph)?;
self.remove_term(&graph_name)?;
self.remove_term(graph_name)?;
true
} else {
false
@ -639,28 +642,19 @@ impl Storage {
}
pub fn remove_all_named_graphs(&self) -> std::io::Result<()> {
self.db.clear(&self.gspo_cf)?;
self.db.clear(&self.gpos_cf)?;
self.db.clear(&self.gosp_cf)?;
self.db.clear(&self.spog_cf)?;
self.db.clear(&self.posg_cf)?;
self.db.clear(&self.ospg_cf)?;
self.db.clear(&self.graphs_cf)?;
for graph_name in self.named_graphs() {
self.remove_encoded_named_graph(&graph_name?)?;
}
Ok(())
}
pub fn clear(&self) -> std::io::Result<()> {
self.db.clear(&self.dspo_cf)?;
self.db.clear(&self.dpos_cf)?;
self.db.clear(&self.dosp_cf)?;
self.db.clear(&self.gspo_cf)?;
self.db.clear(&self.gpos_cf)?;
self.db.clear(&self.gosp_cf)?;
self.db.clear(&self.spog_cf)?;
self.db.clear(&self.posg_cf)?;
self.db.clear(&self.ospg_cf)?;
self.db.clear(&self.graphs_cf)?;
self.db.clear(&self.id2str_cf)?;
for graph_name in self.named_graphs() {
self.remove_encoded_named_graph(&graph_name?)?;
}
for quad in self.quads() {
self.remove_encoded(&quad?)?;
}
Ok(())
}

@ -44,8 +44,9 @@ unsafe impl Send for Db {}
unsafe impl Sync for Db {}
struct DbHandler {
db: *mut rocksdb_t,
db: *mut rocksdb_transactiondb_t,
options: *mut rocksdb_options_t,
txn_options: *mut rocksdb_transactiondb_options_t,
env: Option<*mut rocksdb_env_t>,
column_families: Vec<&'static str>,
cf_handles: Vec<*mut rocksdb_column_family_handle_t>,
@ -54,7 +55,11 @@ struct DbHandler {
impl Drop for DbHandler {
fn drop(&mut self) {
unsafe {
rocksdb_close(self.db);
for cf_handle in &self.cf_handles {
rocksdb_column_family_handle_destroy(*cf_handle);
}
rocksdb_transactiondb_close(self.db);
rocksdb_transactiondb_options_destroy(self.txn_options);
rocksdb_options_destroy(self.options);
if let Some(env) = self.env {
rocksdb_env_destroy(env);
@ -94,10 +99,18 @@ impl Db {
assert!(!options.is_null(), "rocksdb_options_create returned null");
rocksdb_options_set_create_if_missing(options, 1);
rocksdb_options_set_create_missing_column_families(options, 1);
let txn_options = rocksdb_transactiondb_options_create();
assert!(
!txn_options.is_null(),
"rocksdb_transactiondb_options_create returned null"
);
let env = if in_memory {
let env = rocksdb_create_mem_env();
if env.is_null() {
rocksdb_options_destroy(options);
rocksdb_transactiondb_options_destroy(txn_options);
return Err(other_error("Not able to create an in-memory environment."));
}
rocksdb_options_set_env(options, env);
@ -115,19 +128,13 @@ impl Db {
.map(|cf| CString::new(*cf))
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(invalid_input_error)?;
let cf_options = column_families
.iter()
.map(|_| {
let options: *const rocksdb_options_t = rocksdb_options_create();
assert!(!options.is_null(), "rocksdb_options_create returned null");
options
})
.collect::<Vec<_>>();
let cf_options: Vec<*const rocksdb_options_t> = vec![options; column_families.len()];
let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> =
vec![ptr::null_mut(); column_families.len()];
let db = ffi_result!(rocksdb_open_column_families(
let db = ffi_result!(rocksdb_transactiondb_open_column_families(
options,
txn_options,
c_path.as_ptr(),
column_families.len().try_into().unwrap(),
c_column_families
@ -140,6 +147,7 @@ impl Db {
))
.map_err(|e| {
rocksdb_options_destroy(options);
rocksdb_transactiondb_options_destroy(txn_options);
if let Some(env) = env {
rocksdb_env_destroy(env);
}
@ -148,8 +156,9 @@ impl Db {
assert!(!db.is_null(), "rocksdb_create returned null");
for handle in &cf_handles {
if handle.is_null() {
rocksdb_close(db);
rocksdb_transactiondb_close(db);
rocksdb_options_destroy(options);
rocksdb_transactiondb_options_destroy(txn_options);
if let Some(env) = env {
rocksdb_env_destroy(env);
}
@ -162,6 +171,7 @@ impl Db {
Ok(DbHandler {
db,
options,
txn_options,
env,
column_families,
cf_handles,
@ -185,7 +195,7 @@ impl Db {
!options.is_null(),
"rocksdb_flushoptions_create returned null"
);
let r = ffi_result!(rocksdb_flush(self.0.db, options));
let r = ffi_result!(rocksdb_transactiondb_flush(self.0.db, options));
rocksdb_flushoptions_destroy(options);
r
}
@ -202,7 +212,7 @@ impl Db {
!options.is_null(),
"rocksdb_readoptions_create returned null"
);
let r = ffi_result!(rocksdb_get_pinned_cf(
let r = ffi_result!(rocksdb_transactiondb_get_pinned_cf(
self.0.db,
options,
column_family.0,
@ -233,7 +243,7 @@ impl Db {
!options.is_null(),
"rocksdb_writeoptions_create returned null"
);
let r = ffi_result!(rocksdb_put_cf(
let r = ffi_result!(rocksdb_transactiondb_put_cf(
self.0.db,
options,
column_family.0,
@ -258,7 +268,7 @@ impl Db {
!options.is_null(),
"rocksdb_writeoptions_create returned null"
);
let r = ffi_result!(rocksdb_delete_cf(
let r = ffi_result!(rocksdb_transactiondb_delete_cf(
self.0.db,
options,
column_family.0,
@ -270,29 +280,6 @@ impl Db {
}
}
pub fn clear(&self, column_family: &ColumnFamily) -> Result<()> {
unsafe {
let options = rocksdb_writeoptions_create();
assert!(
!options.is_null(),
"rocksdb_writeoptions_create returned null"
);
let start = [];
let end = [c_char::MAX; 257];
let r = ffi_result!(rocksdb_delete_range_cf(
self.0.db,
options,
column_family.0,
start.as_ptr(),
start.len(),
end.as_ptr(),
end.len(),
));
rocksdb_writeoptions_destroy(options);
r
}
}
pub fn iter(&self, column_family: &ColumnFamily) -> Iter {
self.scan_prefix(column_family, &[])
}
@ -304,7 +291,8 @@ impl Db {
!options.is_null(),
"rocksdb_readoptions_create returned null"
);
let iter = rocksdb_create_iterator_cf(self.0.db, options, column_family.0);
let iter =
rocksdb_transactiondb_create_iterator_cf(self.0.db, options, column_family.0);
assert!(!options.is_null(), "rocksdb_create_iterator returned null");
if prefix.is_empty() {
rocksdb_iter_seek_to_first(iter);

@ -433,15 +433,15 @@ impl PyObjectProtocol for PyStore {
self.inner.to_string()
}
fn __bool__(&self) -> bool {
!self.inner.is_empty()
fn __bool__(&self) -> PyResult<bool> {
Ok(!self.inner.is_empty()?)
}
}
#[pyproto]
impl PySequenceProtocol for PyStore {
fn __len__(&self) -> usize {
self.inner.len()
fn __len__(&self) -> PyResult<usize> {
Ok(self.inner.len()?)
}
fn __contains__(&self, quad: PyQuad) -> PyResult<bool> {

@ -1,9 +1,30 @@
#include "../rocksdb/db/c.cc"
#include "c.h"
extern "C" {
rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf(
rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
Status s = db->rep->Get(options->rep, column_family->rep, Slice(key, keylen),
&v->rep);
if (!s.ok()) {
delete v;
if (!s.IsNotFound()) {
SaveError(errptr, s);
}
return nullptr;
}
return v;
}
void rocksdb_transactiondb_flush(
rocksdb_t* db,
rocksdb_transactiondb_t* db,
const rocksdb_flushoptions_t* options,
char** errptr) {
SaveError(errptr, db->rep->Flush(options->rep));
}
}

@ -2,5 +2,19 @@
#include "../rocksdb/include/rocksdb/c.h"
#ifdef __cplusplus
extern "C" {
#endif
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf(
rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush(
rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, char** errptr);
rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, char** errptr);
#ifdef __cplusplus
}
#endif

Loading…
Cancel
Save