From e59c4612b2bfaabffdaa86e325ff7d13635d763f Mon Sep 17 00:00:00 2001 From: Tpt Date: Sat, 27 Nov 2021 20:13:47 +0100 Subject: [PATCH] RocksDB: Removes merge and compact operators --- lib/src/storage/backend/mod.rs | 19 +-- lib/src/storage/backend/rocksdb.rs | 215 +---------------------------- lib/src/storage/mod.rs | 30 +--- 3 files changed, 5 insertions(+), 259 deletions(-) diff --git a/lib/src/storage/backend/mod.rs b/lib/src/storage/backend/mod.rs index bcc2f640..9108d592 100644 --- a/lib/src/storage/backend/mod.rs +++ b/lib/src/storage/backend/mod.rs @@ -2,28 +2,13 @@ //! RocksDB is available, if not in memory #[cfg(target_arch = "wasm32")] -pub use fallback::{ - ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, WriteBatchWithIndex, -}; +pub use fallback::{ColumnFamily, ColumnFamilyDefinition, Db, Iter, WriteBatchWithIndex}; #[cfg(not(target_arch = "wasm32"))] pub use rocksdb::{ - ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, Reader, SstFileWriter, - Transaction, + ColumnFamily, ColumnFamilyDefinition, Db, Iter, Reader, SstFileWriter, Transaction, }; -use std::ffi::CString; #[cfg(target_arch = "wasm32")] mod fallback; #[cfg(not(target_arch = "wasm32"))] mod rocksdb; - -pub struct CompactionFilter { - pub filter: fn(&[u8], &[u8]) -> CompactionAction, - pub name: CString, -} - -#[warn(dead_code)] -pub enum CompactionAction { - Keep, - Remove, -} diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index cda31e6e..8818bf05 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -5,9 +5,8 @@ #![allow(unsafe_code)] use crate::error::invalid_input_error; -use crate::storage::backend::{CompactionAction, CompactionFilter}; use lazy_static::lazy_static; -use libc::{self, c_char, c_int, c_uchar, c_void, free, size_t}; +use libc::{self, c_char, c_void, free}; use oxrocksdb_sys::*; use rand::random; use std::borrow::Borrow; @@ -16,7 +15,6 @@ use std::env::temp_dir; use std::ffi::{CStr, CString}; use std::fs::remove_dir_all; use std::io::{Error, ErrorKind, Result}; -use std::iter::Zip; use std::ops::Deref; use std::path::{Path, PathBuf}; use std::rc::Rc; @@ -64,8 +62,6 @@ lazy_static! { pub struct ColumnFamilyDefinition { pub name: &'static str, - pub merge_operator: Option, - pub compaction_filter: Option, pub use_iter: bool, pub min_prefix_size: usize, } @@ -91,7 +87,6 @@ struct DbHandler { column_family_names: Vec<&'static str>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_options: Vec<*mut rocksdb_options_t>, - cf_compaction_filters: Vec<*mut rocksdb_compactionfilter_t>, path: PathBuf, remove_path: bool, } @@ -116,9 +111,6 @@ impl Drop for DbHandler { rocksdb_transactiondb_options_destroy(self.transactiondb_options); rocksdb_options_destroy(self.options); rocksdb_block_based_options_destroy(self.block_based_table_options); - for cf_compact in &self.cf_compaction_filters { - rocksdb_compactionfilter_destroy(*cf_compact); - } } if self.remove_path && self.path.exists() { remove_dir_all(&self.path).unwrap(); @@ -162,7 +154,6 @@ impl Db { rocksdb_options_set_info_log_level(options, 2); // We only log warnings rocksdb_options_set_max_log_file_size(options, 1024 * 1024); // Only 1MB log size rocksdb_options_set_recycle_log_file_num(options, 10); // We do not keep more than 10 log files - rocksdb_options_set_max_successive_merges(options, 5); // merge are expensive, let's pay the price once rocksdb_options_set_compression( options, if in_memory { @@ -202,8 +193,6 @@ impl Db { if !column_families.iter().any(|c| c.name == "default") { column_families.push(ColumnFamilyDefinition { name: "default", - merge_operator: None, - compaction_filter: None, use_iter: true, min_prefix_size: 0, }) @@ -214,7 +203,6 @@ impl Db { .map(|name| CString::new(*name)) .collect::, _>>() .map_err(invalid_input_error)?; - let mut cf_compaction_filters = Vec::new(); let cf_options = column_families .into_iter() .map(|cf| { @@ -228,36 +216,6 @@ impl Db { rocksdb_slicetransform_create_fixed_prefix(cf.min_prefix_size), ); } - if let Some(merge) = cf.merge_operator { - // mergeoperator delete is done automatically - let merge = rocksdb_mergeoperator_create( - Box::into_raw(Box::new(merge)) as *mut c_void, - Some(merge_destructor), - Some(merge_full), - Some(merge_partial), - Some(merge_delete_value), - Some(merge_name), - ); - assert!( - !merge.is_null(), - "rocksdb_mergeoperator_create returned null" - ); - rocksdb_options_set_merge_operator(options, merge); - } - if let Some(compact) = cf.compaction_filter { - let compact = rocksdb_compactionfilter_create( - Box::into_raw(Box::new(compact)) as *mut c_void, - Some(compactionfilter_destructor), - Some(compactionfilter_filter), - Some(compactionfilter_name), - ); - assert!( - !compact.is_null(), - "rocksdb_compactionfilter_create returned null" - ); - rocksdb_options_set_compaction_filter(options, compact); - cf_compaction_filters.push(compact); - } options }) .collect::>(); @@ -348,7 +306,6 @@ impl Db { column_family_names, cf_handles, cf_options, - cf_compaction_filters, path, remove_path: in_memory, }) @@ -754,19 +711,6 @@ impl Transaction { )) } } - - pub fn merge(&mut self, column_family: &ColumnFamily, key: &[u8], value: &[u8]) -> Result<()> { - unsafe { - ffi_result!(rocksdb_transaction_merge_cf( - self.inner.transaction, - column_family.0, - key.as_ptr() as *const c_char, - key.len(), - value.as_ptr() as *const c_char, - value.len(), - )) - } - } } pub struct PinnableSlice(*mut rocksdb_pinnableslice_t); @@ -883,18 +827,6 @@ impl Iter { None } } - - pub fn value(&self) -> Option<&[u8]> { - if self.is_valid() { - unsafe { - let mut len = 0; - let val = rocksdb_iter_value(self.iter, &mut len); - Some(slice::from_raw_parts(val as *const u8, len)) - } - } else { - None - } - } } pub struct SstFileWriter { @@ -927,18 +859,6 @@ impl SstFileWriter { self.insert(key, &[]) } - pub fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<()> { - unsafe { - ffi_result!(rocksdb_sstfilewriter_merge( - self.writer, - key.as_ptr() as *const c_char, - key.len(), - value.as_ptr() as *const c_char, - value.len(), - )) - } - } - pub fn finish(self) -> Result { unsafe { ffi_result!(rocksdb_sstfilewriter_finish(self.writer))?; @@ -960,139 +880,6 @@ fn other_error(error: impl Into>) -> Er Error::new(ErrorKind::InvalidInput, error) } -pub struct MergeOperator { - pub full: fn(&[u8], Option<&[u8]>, SlicesIterator<'_>) -> Vec, - pub partial: fn(&[u8], SlicesIterator<'_>) -> Vec, - pub name: CString, -} - -unsafe extern "C" fn merge_destructor(operator: *mut c_void) { - Box::from_raw(operator as *mut MergeOperator); -} - -unsafe extern "C" fn merge_full( - operator: *mut c_void, - key: *const c_char, - key_length: size_t, - existing_value: *const c_char, - existing_value_len: size_t, - operands_list: *const *const c_char, - operands_list_length: *const size_t, - num_operands: c_int, - success: *mut u8, - new_value_length: *mut size_t, -) -> *mut c_char { - let operator = &*(operator as *const MergeOperator); - let result = (operator.full)( - slice::from_raw_parts(key as *const u8, key_length), - if existing_value.is_null() { - None - } else { - Some(slice::from_raw_parts( - existing_value as *const u8, - existing_value_len, - )) - }, - SlicesIterator::new(operands_list, operands_list_length, num_operands), - ); - *new_value_length = result.len(); - *success = 1_u8; - Box::into_raw(result.into_boxed_slice()) as *mut c_char -} - -pub unsafe extern "C" fn merge_partial( - operator: *mut c_void, - key: *const c_char, - key_length: size_t, - operands_list: *const *const c_char, - operands_list_length: *const size_t, - num_operands: c_int, - success: *mut u8, - new_value_length: *mut size_t, -) -> *mut c_char { - let operator = &*(operator as *const MergeOperator); - let result = (operator.partial)( - slice::from_raw_parts(key as *const u8, key_length), - SlicesIterator::new(operands_list, operands_list_length, num_operands), - ); - *new_value_length = result.len(); - *success = 1_u8; - Box::into_raw(result.into_boxed_slice()) as *mut c_char -} - -unsafe extern "C" fn merge_delete_value( - _operator: *mut c_void, - value: *const c_char, - value_length: size_t, -) { - if !value.is_null() { - Box::from_raw(slice::from_raw_parts_mut(value as *mut u8, value_length)); - } -} - -unsafe extern "C" fn merge_name(operator: *mut c_void) -> *const c_char { - let operator = &*(operator as *const MergeOperator); - operator.name.as_ptr() -} - -pub struct SlicesIterator<'a>( - Zip, std::slice::Iter<'a, size_t>>, -); - -impl<'a> SlicesIterator<'a> { - unsafe fn new( - operands_list: *const *const c_char, - operands_list_length: *const size_t, - num_operands: c_int, - ) -> Self { - let num_operands = usize::try_from(num_operands).unwrap(); - Self( - slice::from_raw_parts(operands_list, num_operands) - .iter() - .zip(slice::from_raw_parts(operands_list_length, num_operands)), - ) - } -} - -impl<'a> Iterator for SlicesIterator<'a> { - type Item = &'a [u8]; - - fn next(&mut self) -> Option { - let (slice, len) = self.0.next()?; - Some(unsafe { slice::from_raw_parts(*slice as *const u8, *len) }) - } -} - -unsafe extern "C" fn compactionfilter_destructor(filter: *mut c_void) { - Box::from_raw(filter as *mut CompactionFilter); -} - -unsafe extern "C" fn compactionfilter_filter( - filter: *mut c_void, - _level: c_int, - key: *const c_char, - key_length: size_t, - existing_value: *const c_char, - value_length: size_t, - _new_value: *mut *mut c_char, - _new_value_length: *mut size_t, - _value_changed: *mut c_uchar, -) -> c_uchar { - let filter = &*(filter as *const CompactionFilter); - match (filter.filter)( - slice::from_raw_parts(key as *const u8, key_length), - slice::from_raw_parts(existing_value as *const u8, value_length), - ) { - CompactionAction::Keep => 0, - CompactionAction::Remove => 1, - } -} - -unsafe extern "C" fn compactionfilter_name(filter: *mut c_void) -> *const c_char { - let filter = &*(filter as *const CompactionFilter); - filter.name.as_ptr() -} - struct UnsafeEnv(*mut rocksdb_env_t); // Hack for lazy_static. OK because only written in lazy static and used in a thread-safe way by RocksDB diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 777c85bc..a11f321f 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -8,13 +8,9 @@ use crate::storage::binary_encoder::{ LATEST_STORAGE_VERSION, WRITTEN_TERM_MAX_SIZE, }; use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup}; -use backend::{ - ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter, - MergeOperator, -}; +use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; #[cfg(not(target_arch = "wasm32"))] -use std::collections::{hash_map, HashMap, HashSet}; -use std::ffi::CString; +use std::collections::{HashMap, HashSet}; use std::io::Result; use std::mem::swap; #[cfg(not(target_arch = "wasm32"))] @@ -75,78 +71,56 @@ impl Storage { vec![ ColumnFamilyDefinition { name: ID2STR_CF, - merge_operator: None, - compaction_filter: None, use_iter: false, min_prefix_size: 0, }, ColumnFamilyDefinition { name: SPOG_CF, - merge_operator: None, - compaction_filter: None, use_iter: true, min_prefix_size: 17, // named or blank node start }, ColumnFamilyDefinition { name: POSG_CF, - merge_operator: None, - compaction_filter: None, use_iter: true, min_prefix_size: 17, // named node start }, ColumnFamilyDefinition { name: OSPG_CF, - merge_operator: None, - compaction_filter: None, use_iter: true, min_prefix_size: 0, // There are small literals... }, ColumnFamilyDefinition { name: GSPO_CF, - merge_operator: None, - compaction_filter: None, use_iter: true, min_prefix_size: 17, // named or blank node start }, ColumnFamilyDefinition { name: GPOS_CF, - merge_operator: None, - compaction_filter: None, use_iter: true, min_prefix_size: 17, // named or blank node start }, ColumnFamilyDefinition { name: GOSP_CF, - merge_operator: None, - compaction_filter: None, use_iter: true, min_prefix_size: 17, // named or blank node start }, ColumnFamilyDefinition { name: DSPO_CF, - merge_operator: None, - compaction_filter: None, use_iter: true, min_prefix_size: 17, // named or blank node start }, ColumnFamilyDefinition { name: DPOS_CF, - merge_operator: None, - compaction_filter: None, use_iter: true, min_prefix_size: 17, // named or blank node start }, ColumnFamilyDefinition { name: DOSP_CF, - merge_operator: None, - compaction_filter: None, use_iter: true, min_prefix_size: 0, // There are small literals... }, ColumnFamilyDefinition { name: GRAPHS_CF, - merge_operator: None, - compaction_filter: None, use_iter: true, min_prefix_size: 17, // named or blank node start },