diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 335eeb99..053c9110 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -38,6 +38,7 @@ hex = "0.4" nom = "7" siphasher = "0.3" lasso = "0.5" +lazy_static = "1" sophia_api = { version = "0.7", optional = true } json-event-parser = "0.1" num_cpus = "1" diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index 7a15682d..cda31e6e 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -6,6 +6,7 @@ use crate::error::invalid_input_error; use crate::storage::backend::{CompactionAction, CompactionFilter}; +use lazy_static::lazy_static; use libc::{self, c_char, c_int, c_uchar, c_void, free, size_t}; use oxrocksdb_sys::*; use rand::random; @@ -13,6 +14,7 @@ use std::borrow::Borrow; use std::cell::Cell; use std::env::temp_dir; use std::ffi::{CStr, CString}; +use std::fs::remove_dir_all; use std::io::{Error, ErrorKind, Result}; use std::iter::Zip; use std::ops::Deref; @@ -43,6 +45,23 @@ macro_rules! ffi_result_impl { }} } +lazy_static! { + static ref ROCKSDB_ENV: UnsafeEnv = { + unsafe { + let env = rocksdb_create_default_env(); + assert!(!env.is_null(), "rocksdb_create_default_env returned null"); + UnsafeEnv(env) + } + }; + static ref ROCKSDB_MEM_ENV: UnsafeEnv = { + unsafe { + let env = rocksdb_create_mem_env(); + assert!(!env.is_null(), "rocksdb_create_mem_env returned null"); + UnsafeEnv(env) + } + }; +} + pub struct ColumnFamilyDefinition { pub name: &'static str, pub merge_operator: Option, @@ -69,12 +88,12 @@ struct DbHandler { ingest_external_file_options: *mut rocksdb_ingestexternalfileoptions_t, compaction_options: *mut rocksdb_compactoptions_t, block_based_table_options: *mut rocksdb_block_based_table_options_t, - env: Option<*mut rocksdb_env_t>, column_family_names: Vec<&'static str>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_options: Vec<*mut rocksdb_options_t>, cf_compaction_filters: Vec<*mut rocksdb_compactionfilter_t>, path: PathBuf, + remove_path: bool, } impl Drop for DbHandler { @@ -97,13 +116,13 @@ impl Drop for DbHandler { rocksdb_transactiondb_options_destroy(self.transactiondb_options); rocksdb_options_destroy(self.options); rocksdb_block_based_options_destroy(self.block_based_table_options); - if let Some(env) = self.env { - rocksdb_env_destroy(env); - } for cf_compact in &self.cf_compaction_filters { rocksdb_compactionfilter_destroy(*cf_compact); } } + if self.remove_path && self.path.exists() { + remove_dir_all(&self.path).unwrap(); + } } } @@ -114,20 +133,24 @@ impl Db { } else { temp_dir() } - .join("oxigraph-temp-rocksdb"); - Ok(Self(Arc::new(Self::do_open(&path, column_families, true)?))) + .join(format!("oxigraph-rocksdb-{}", random::())); + Ok(Self(Arc::new(Self::do_open(path, column_families, true)?))) } pub fn open(path: &Path, column_families: Vec) -> Result { - Ok(Self(Arc::new(Self::do_open(path, column_families, false)?))) + Ok(Self(Arc::new(Self::do_open( + path.to_owned(), + column_families, + false, + )?))) } fn do_open( - path: &Path, + path: PathBuf, mut column_families: Vec, in_memory: bool, ) -> Result { - let c_path = path_to_cstring(path)?; + let c_path = path_to_cstring(&path)?; unsafe { let options = rocksdb_options_create(); @@ -150,6 +173,14 @@ impl Db { .try_into() .unwrap(), ); + rocksdb_options_set_env( + options, + if in_memory { + ROCKSDB_MEM_ENV.0 + } else { + ROCKSDB_ENV.0 + }, + ); let block_based_table_options = rocksdb_block_based_options_create(); assert!( !block_based_table_options.is_null(), @@ -168,19 +199,6 @@ impl Db { "rocksdb_transactiondb_options_create returned null" ); - let env = if in_memory { - let env = rocksdb_create_mem_env(); - if env.is_null() { - rocksdb_options_destroy(options); - rocksdb_transactiondb_options_destroy(transactiondb_options); - return Err(other_error("Not able to create an in-memory environment.")); - } - rocksdb_options_set_env(options, env); - Some(env) - } else { - None - }; - if !column_families.iter().any(|c| c.name == "default") { column_families.push(ColumnFamilyDefinition { name: "default", @@ -327,12 +345,12 @@ impl Db { ingest_external_file_options, compaction_options, block_based_table_options, - env, column_family_names, cf_handles, cf_options, cf_compaction_filters, - path: path.to_path_buf(), + path, + remove_path: in_memory, }) } } @@ -1075,6 +1093,11 @@ unsafe extern "C" fn compactionfilter_name(filter: *mut c_void) -> *const c_char filter.name.as_ptr() } +struct UnsafeEnv(*mut rocksdb_env_t); + +// Hack for lazy_static. OK because only written in lazy static and used in a thread-safe way by RocksDB +unsafe impl Sync for UnsafeEnv {} + fn path_to_cstring(path: &Path) -> Result { CString::new( path.to_str()