RocksDB: Removes merge and compact operators

pull/173/head
Tpt 3 years ago
parent b7ee3a6767
commit e59c4612b2
  1. 19
      lib/src/storage/backend/mod.rs
  2. 215
      lib/src/storage/backend/rocksdb.rs
  3. 30
      lib/src/storage/mod.rs

@ -2,28 +2,13 @@
//! RocksDB is available, if not in memory //! RocksDB is available, if not in memory
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
pub use fallback::{ pub use fallback::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, WriteBatchWithIndex};
ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, WriteBatchWithIndex,
};
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub use rocksdb::{ pub use rocksdb::{
ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, Reader, SstFileWriter, ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, SstFileWriter, Transaction,
Transaction,
}; };
use std::ffi::CString;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
mod fallback; mod fallback;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
mod rocksdb; mod rocksdb;
pub struct CompactionFilter {
pub filter: fn(&[u8], &[u8]) -> CompactionAction,
pub name: CString,
}
#[warn(dead_code)]
pub enum CompactionAction {
Keep,
Remove,
}

@ -5,9 +5,8 @@
#![allow(unsafe_code)] #![allow(unsafe_code)]
use crate::error::invalid_input_error; use crate::error::invalid_input_error;
use crate::storage::backend::{CompactionAction, CompactionFilter};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use libc::{self, c_char, c_int, c_uchar, c_void, free, size_t}; use libc::{self, c_char, c_void, free};
use oxrocksdb_sys::*; use oxrocksdb_sys::*;
use rand::random; use rand::random;
use std::borrow::Borrow; use std::borrow::Borrow;
@ -16,7 +15,6 @@ use std::env::temp_dir;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::fs::remove_dir_all; use std::fs::remove_dir_all;
use std::io::{Error, ErrorKind, Result}; use std::io::{Error, ErrorKind, Result};
use std::iter::Zip;
use std::ops::Deref; use std::ops::Deref;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::rc::Rc; use std::rc::Rc;
@ -64,8 +62,6 @@ lazy_static! {
pub struct ColumnFamilyDefinition { pub struct ColumnFamilyDefinition {
pub name: &'static str, pub name: &'static str,
pub merge_operator: Option<MergeOperator>,
pub compaction_filter: Option<CompactionFilter>,
pub use_iter: bool, pub use_iter: bool,
pub min_prefix_size: usize, pub min_prefix_size: usize,
} }
@ -91,7 +87,6 @@ struct DbHandler {
column_family_names: Vec<&'static str>, column_family_names: Vec<&'static str>,
cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>,
cf_options: Vec<*mut rocksdb_options_t>, cf_options: Vec<*mut rocksdb_options_t>,
cf_compaction_filters: Vec<*mut rocksdb_compactionfilter_t>,
path: PathBuf, path: PathBuf,
remove_path: bool, remove_path: bool,
} }
@ -116,9 +111,6 @@ impl Drop for DbHandler {
rocksdb_transactiondb_options_destroy(self.transactiondb_options); rocksdb_transactiondb_options_destroy(self.transactiondb_options);
rocksdb_options_destroy(self.options); rocksdb_options_destroy(self.options);
rocksdb_block_based_options_destroy(self.block_based_table_options); rocksdb_block_based_options_destroy(self.block_based_table_options);
for cf_compact in &self.cf_compaction_filters {
rocksdb_compactionfilter_destroy(*cf_compact);
}
} }
if self.remove_path && self.path.exists() { if self.remove_path && self.path.exists() {
remove_dir_all(&self.path).unwrap(); remove_dir_all(&self.path).unwrap();
@ -162,7 +154,6 @@ impl Db {
rocksdb_options_set_info_log_level(options, 2); // We only log warnings rocksdb_options_set_info_log_level(options, 2); // We only log warnings
rocksdb_options_set_max_log_file_size(options, 1024 * 1024); // Only 1MB log size rocksdb_options_set_max_log_file_size(options, 1024 * 1024); // Only 1MB log size
rocksdb_options_set_recycle_log_file_num(options, 10); // We do not keep more than 10 log files rocksdb_options_set_recycle_log_file_num(options, 10); // We do not keep more than 10 log files
rocksdb_options_set_max_successive_merges(options, 5); // merge are expensive, let's pay the price once
rocksdb_options_set_compression( rocksdb_options_set_compression(
options, options,
if in_memory { if in_memory {
@ -202,8 +193,6 @@ impl Db {
if !column_families.iter().any(|c| c.name == "default") { if !column_families.iter().any(|c| c.name == "default") {
column_families.push(ColumnFamilyDefinition { column_families.push(ColumnFamilyDefinition {
name: "default", name: "default",
merge_operator: None,
compaction_filter: None,
use_iter: true, use_iter: true,
min_prefix_size: 0, min_prefix_size: 0,
}) })
@ -214,7 +203,6 @@ impl Db {
.map(|name| CString::new(*name)) .map(|name| CString::new(*name))
.collect::<std::result::Result<Vec<_>, _>>() .collect::<std::result::Result<Vec<_>, _>>()
.map_err(invalid_input_error)?; .map_err(invalid_input_error)?;
let mut cf_compaction_filters = Vec::new();
let cf_options = column_families let cf_options = column_families
.into_iter() .into_iter()
.map(|cf| { .map(|cf| {
@ -228,36 +216,6 @@ impl Db {
rocksdb_slicetransform_create_fixed_prefix(cf.min_prefix_size), rocksdb_slicetransform_create_fixed_prefix(cf.min_prefix_size),
); );
} }
if let Some(merge) = cf.merge_operator {
// mergeoperator delete is done automatically
let merge = rocksdb_mergeoperator_create(
Box::into_raw(Box::new(merge)) as *mut c_void,
Some(merge_destructor),
Some(merge_full),
Some(merge_partial),
Some(merge_delete_value),
Some(merge_name),
);
assert!(
!merge.is_null(),
"rocksdb_mergeoperator_create returned null"
);
rocksdb_options_set_merge_operator(options, merge);
}
if let Some(compact) = cf.compaction_filter {
let compact = rocksdb_compactionfilter_create(
Box::into_raw(Box::new(compact)) as *mut c_void,
Some(compactionfilter_destructor),
Some(compactionfilter_filter),
Some(compactionfilter_name),
);
assert!(
!compact.is_null(),
"rocksdb_compactionfilter_create returned null"
);
rocksdb_options_set_compaction_filter(options, compact);
cf_compaction_filters.push(compact);
}
options options
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -348,7 +306,6 @@ impl Db {
column_family_names, column_family_names,
cf_handles, cf_handles,
cf_options, cf_options,
cf_compaction_filters,
path, path,
remove_path: in_memory, remove_path: in_memory,
}) })
@ -754,19 +711,6 @@ impl Transaction {
)) ))
} }
} }
pub fn merge(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> {
unsafe {
ffi_result!(rocksdb_transaction_merge_cf(
self.inner.transaction,
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
))
}
}
} }
pub struct PinnableSlice(*mut rocksdb_pinnableslice_t); pub struct PinnableSlice(*mut rocksdb_pinnableslice_t);
@ -883,18 +827,6 @@ impl Iter {
None None
} }
} }
pub fn value(&self) -> Option<&[u8]> {
if self.is_valid() {
unsafe {
let mut len = 0;
let val = rocksdb_iter_value(self.iter, &mut len);
Some(slice::from_raw_parts(val as *const u8, len))
}
} else {
None
}
}
} }
pub struct SstFileWriter { pub struct SstFileWriter {
@ -927,18 +859,6 @@ impl SstFileWriter {
self.insert(key, &[]) self.insert(key, &[])
} }
pub fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
unsafe {
ffi_result!(rocksdb_sstfilewriter_merge(
self.writer,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
))
}
}
pub fn finish(self) -> Result<PathBuf> { pub fn finish(self) -> Result<PathBuf> {
unsafe { unsafe {
ffi_result!(rocksdb_sstfilewriter_finish(self.writer))?; ffi_result!(rocksdb_sstfilewriter_finish(self.writer))?;
@ -960,139 +880,6 @@ fn other_error(error: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Er
Error::new(ErrorKind::InvalidInput, error) Error::new(ErrorKind::InvalidInput, error)
} }
pub struct MergeOperator {
pub full: fn(&[u8], Option<&[u8]>, SlicesIterator<'_>) -> Vec<u8>,
pub partial: fn(&[u8], SlicesIterator<'_>) -> Vec<u8>,
pub name: CString,
}
unsafe extern "C" fn merge_destructor(operator: *mut c_void) {
Box::from_raw(operator as *mut MergeOperator);
}
unsafe extern "C" fn merge_full(
operator: *mut c_void,
key: *const c_char,
key_length: size_t,
existing_value: *const c_char,
existing_value_len: size_t,
operands_list: *const *const c_char,
operands_list_length: *const size_t,
num_operands: c_int,
success: *mut u8,
new_value_length: *mut size_t,
) -> *mut c_char {
let operator = &*(operator as *const MergeOperator);
let result = (operator.full)(
slice::from_raw_parts(key as *const u8, key_length),
if existing_value.is_null() {
None
} else {
Some(slice::from_raw_parts(
existing_value as *const u8,
existing_value_len,
))
},
SlicesIterator::new(operands_list, operands_list_length, num_operands),
);
*new_value_length = result.len();
*success = 1_u8;
Box::into_raw(result.into_boxed_slice()) as *mut c_char
}
pub unsafe extern "C" fn merge_partial(
operator: *mut c_void,
key: *const c_char,
key_length: size_t,
operands_list: *const *const c_char,
operands_list_length: *const size_t,
num_operands: c_int,
success: *mut u8,
new_value_length: *mut size_t,
) -> *mut c_char {
let operator = &*(operator as *const MergeOperator);
let result = (operator.partial)(
slice::from_raw_parts(key as *const u8, key_length),
SlicesIterator::new(operands_list, operands_list_length, num_operands),
);
*new_value_length = result.len();
*success = 1_u8;
Box::into_raw(result.into_boxed_slice()) as *mut c_char
}
unsafe extern "C" fn merge_delete_value(
_operator: *mut c_void,
value: *const c_char,
value_length: size_t,
) {
if !value.is_null() {
Box::from_raw(slice::from_raw_parts_mut(value as *mut u8, value_length));
}
}
unsafe extern "C" fn merge_name(operator: *mut c_void) -> *const c_char {
let operator = &*(operator as *const MergeOperator);
operator.name.as_ptr()
}
pub struct SlicesIterator<'a>(
Zip<std::slice::Iter<'a, *const c_char>, std::slice::Iter<'a, size_t>>,
);
impl<'a> SlicesIterator<'a> {
unsafe fn new(
operands_list: *const *const c_char,
operands_list_length: *const size_t,
num_operands: c_int,
) -> Self {
let num_operands = usize::try_from(num_operands).unwrap();
Self(
slice::from_raw_parts(operands_list, num_operands)
.iter()
.zip(slice::from_raw_parts(operands_list_length, num_operands)),
)
}
}
impl<'a> Iterator for SlicesIterator<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
let (slice, len) = self.0.next()?;
Some(unsafe { slice::from_raw_parts(*slice as *const u8, *len) })
}
}
unsafe extern "C" fn compactionfilter_destructor(filter: *mut c_void) {
Box::from_raw(filter as *mut CompactionFilter);
}
unsafe extern "C" fn compactionfilter_filter(
filter: *mut c_void,
_level: c_int,
key: *const c_char,
key_length: size_t,
existing_value: *const c_char,
value_length: size_t,
_new_value: *mut *mut c_char,
_new_value_length: *mut size_t,
_value_changed: *mut c_uchar,
) -> c_uchar {
let filter = &*(filter as *const CompactionFilter);
match (filter.filter)(
slice::from_raw_parts(key as *const u8, key_length),
slice::from_raw_parts(existing_value as *const u8, value_length),
) {
CompactionAction::Keep => 0,
CompactionAction::Remove => 1,
}
}
unsafe extern "C" fn compactionfilter_name(filter: *mut c_void) -> *const c_char {
let filter = &*(filter as *const CompactionFilter);
filter.name.as_ptr()
}
struct UnsafeEnv(*mut rocksdb_env_t); struct UnsafeEnv(*mut rocksdb_env_t);
// Hack for lazy_static. OK because only written in lazy static and used in a thread-safe way by RocksDB // Hack for lazy_static. OK because only written in lazy static and used in a thread-safe way by RocksDB

@ -8,13 +8,9 @@ use crate::storage::binary_encoder::{
LATEST_STORAGE_VERSION, WRITTEN_TERM_MAX_SIZE, LATEST_STORAGE_VERSION, WRITTEN_TERM_MAX_SIZE,
}; };
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup}; use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
use backend::{ use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter,
MergeOperator,
};
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::collections::{hash_map, HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::ffi::CString;
use std::io::Result; use std::io::Result;
use std::mem::swap; use std::mem::swap;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -75,78 +71,56 @@ impl Storage {
vec![ vec![
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: ID2STR_CF, name: ID2STR_CF,
merge_operator: None,
compaction_filter: None,
use_iter: false, use_iter: false,
min_prefix_size: 0, min_prefix_size: 0,
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: SPOG_CF, name: SPOG_CF,
merge_operator: None,
compaction_filter: None,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: POSG_CF, name: POSG_CF,
merge_operator: None,
compaction_filter: None,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named node start min_prefix_size: 17, // named node start
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: OSPG_CF, name: OSPG_CF,
merge_operator: None,
compaction_filter: None,
use_iter: true, use_iter: true,
min_prefix_size: 0, // There are small literals... min_prefix_size: 0, // There are small literals...
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: GSPO_CF, name: GSPO_CF,
merge_operator: None,
compaction_filter: None,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: GPOS_CF, name: GPOS_CF,
merge_operator: None,
compaction_filter: None,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: GOSP_CF, name: GOSP_CF,
merge_operator: None,
compaction_filter: None,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: DSPO_CF, name: DSPO_CF,
merge_operator: None,
compaction_filter: None,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: DPOS_CF, name: DPOS_CF,
merge_operator: None,
compaction_filter: None,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: DOSP_CF, name: DOSP_CF,
merge_operator: None,
compaction_filter: None,
use_iter: true, use_iter: true,
min_prefix_size: 0, // There are small literals... min_prefix_size: 0, // There are small literals...
}, },
ColumnFamilyDefinition { ColumnFamilyDefinition {
name: GRAPHS_CF, name: GRAPHS_CF,
merge_operator: None,
compaction_filter: None,
use_iter: true, use_iter: true,
min_prefix_size: 17, // named or blank node start min_prefix_size: 17, // named or blank node start
}, },

Loading…
Cancel
Save