Share environment between RocksDB instances

Allow multiple RocksDB instance in the same process to share resources instead of fighting to get them.
pull/171/head
Tpt 3 years ago
parent 6f44a5956b
commit 2c304aa29d
  1. 1
      lib/Cargo.toml
  2. 71
      lib/src/storage/backend/rocksdb.rs

@ -38,6 +38,7 @@ hex = "0.4"
nom = "7" nom = "7"
siphasher = "0.3" siphasher = "0.3"
lasso = "0.5" lasso = "0.5"
lazy_static = "1"
sophia_api = { version = "0.7", optional = true } sophia_api = { version = "0.7", optional = true }
json-event-parser = "0.1" json-event-parser = "0.1"
num_cpus = "1" num_cpus = "1"

@ -6,6 +6,7 @@
use crate::error::invalid_input_error; use crate::error::invalid_input_error;
use crate::storage::backend::{CompactionAction, CompactionFilter}; use crate::storage::backend::{CompactionAction, CompactionFilter};
use lazy_static::lazy_static;
use libc::{self, c_char, c_int, c_uchar, c_void, free, size_t}; use libc::{self, c_char, c_int, c_uchar, c_void, free, size_t};
use oxrocksdb_sys::*; use oxrocksdb_sys::*;
use rand::random; use rand::random;
@ -13,6 +14,7 @@ use std::borrow::Borrow;
use std::cell::Cell; use std::cell::Cell;
use std::env::temp_dir; use std::env::temp_dir;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::fs::remove_dir_all;
use std::io::{Error, ErrorKind, Result}; use std::io::{Error, ErrorKind, Result};
use std::iter::Zip; use std::iter::Zip;
use std::ops::Deref; use std::ops::Deref;
@ -43,6 +45,23 @@ macro_rules! ffi_result_impl {
}} }}
} }
lazy_static! {
static ref ROCKSDB_ENV: UnsafeEnv = {
unsafe {
let env = rocksdb_create_default_env();
assert!(!env.is_null(), "rocksdb_create_default_env returned null");
UnsafeEnv(env)
}
};
static ref ROCKSDB_MEM_ENV: UnsafeEnv = {
unsafe {
let env = rocksdb_create_mem_env();
assert!(!env.is_null(), "rocksdb_create_mem_env returned null");
UnsafeEnv(env)
}
};
}
pub struct ColumnFamilyDefinition { pub struct ColumnFamilyDefinition {
pub name: &'static str, pub name: &'static str,
pub merge_operator: Option<MergeOperator>, pub merge_operator: Option<MergeOperator>,
@ -69,12 +88,12 @@ struct DbHandler {
ingest_external_file_options: *mut rocksdb_ingestexternalfileoptions_t, ingest_external_file_options: *mut rocksdb_ingestexternalfileoptions_t,
compaction_options: *mut rocksdb_compactoptions_t, compaction_options: *mut rocksdb_compactoptions_t,
block_based_table_options: *mut rocksdb_block_based_table_options_t, block_based_table_options: *mut rocksdb_block_based_table_options_t,
env: Option<*mut rocksdb_env_t>,
column_family_names: Vec<&'static str>, column_family_names: Vec<&'static str>,
cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>,
cf_options: Vec<*mut rocksdb_options_t>, cf_options: Vec<*mut rocksdb_options_t>,
cf_compaction_filters: Vec<*mut rocksdb_compactionfilter_t>, cf_compaction_filters: Vec<*mut rocksdb_compactionfilter_t>,
path: PathBuf, path: PathBuf,
remove_path: bool,
} }
impl Drop for DbHandler { impl Drop for DbHandler {
@ -97,13 +116,13 @@ impl Drop for DbHandler {
rocksdb_transactiondb_options_destroy(self.transactiondb_options); rocksdb_transactiondb_options_destroy(self.transactiondb_options);
rocksdb_options_destroy(self.options); rocksdb_options_destroy(self.options);
rocksdb_block_based_options_destroy(self.block_based_table_options); rocksdb_block_based_options_destroy(self.block_based_table_options);
if let Some(env) = self.env {
rocksdb_env_destroy(env);
}
for cf_compact in &self.cf_compaction_filters { for cf_compact in &self.cf_compaction_filters {
rocksdb_compactionfilter_destroy(*cf_compact); rocksdb_compactionfilter_destroy(*cf_compact);
} }
} }
if self.remove_path && self.path.exists() {
remove_dir_all(&self.path).unwrap();
}
} }
} }
@ -114,20 +133,24 @@ impl Db {
} else { } else {
temp_dir() temp_dir()
} }
.join("oxigraph-temp-rocksdb"); .join(format!("oxigraph-rocksdb-{}", random::<u128>()));
Ok(Self(Arc::new(Self::do_open(&path, column_families, true)?))) Ok(Self(Arc::new(Self::do_open(path, column_families, true)?)))
} }
pub fn open(path: &Path, column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> { pub fn open(path: &Path, column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> {
Ok(Self(Arc::new(Self::do_open(path, column_families, false)?))) Ok(Self(Arc::new(Self::do_open(
path.to_owned(),
column_families,
false,
)?)))
} }
fn do_open( fn do_open(
path: &Path, path: PathBuf,
mut column_families: Vec<ColumnFamilyDefinition>, mut column_families: Vec<ColumnFamilyDefinition>,
in_memory: bool, in_memory: bool,
) -> Result<DbHandler> { ) -> Result<DbHandler> {
let c_path = path_to_cstring(path)?; let c_path = path_to_cstring(&path)?;
unsafe { unsafe {
let options = rocksdb_options_create(); let options = rocksdb_options_create();
@ -150,6 +173,14 @@ impl Db {
.try_into() .try_into()
.unwrap(), .unwrap(),
); );
rocksdb_options_set_env(
options,
if in_memory {
ROCKSDB_MEM_ENV.0
} else {
ROCKSDB_ENV.0
},
);
let block_based_table_options = rocksdb_block_based_options_create(); let block_based_table_options = rocksdb_block_based_options_create();
assert!( assert!(
!block_based_table_options.is_null(), !block_based_table_options.is_null(),
@ -168,19 +199,6 @@ impl Db {
"rocksdb_transactiondb_options_create returned null" "rocksdb_transactiondb_options_create returned null"
); );
let env = if in_memory {
let env = rocksdb_create_mem_env();
if env.is_null() {
rocksdb_options_destroy(options);
rocksdb_transactiondb_options_destroy(transactiondb_options);
return Err(other_error("Not able to create an in-memory environment."));
}
rocksdb_options_set_env(options, env);
Some(env)
} else {
None
};
if !column_families.iter().any(|c| c.name == "default") { if !column_families.iter().any(|c| c.name == "default") {
column_families.push(ColumnFamilyDefinition { column_families.push(ColumnFamilyDefinition {
name: "default", name: "default",
@ -327,12 +345,12 @@ impl Db {
ingest_external_file_options, ingest_external_file_options,
compaction_options, compaction_options,
block_based_table_options, block_based_table_options,
env,
column_family_names, column_family_names,
cf_handles, cf_handles,
cf_options, cf_options,
cf_compaction_filters, cf_compaction_filters,
path: path.to_path_buf(), path,
remove_path: in_memory,
}) })
} }
} }
@ -1075,6 +1093,11 @@ unsafe extern "C" fn compactionfilter_name(filter: *mut c_void) -> *const c_char
filter.name.as_ptr() filter.name.as_ptr()
} }
struct UnsafeEnv(*mut rocksdb_env_t);
// Hack for lazy_static. OK because only written in lazy static and used in a thread-safe way by RocksDB
unsafe impl Sync for UnsafeEnv {}
fn path_to_cstring(path: &Path) -> Result<CString> { fn path_to_cstring(path: &Path) -> Result<CString> {
CString::new( CString::new(
path.to_str() path.to_str()

Loading…
Cancel
Save