diff --git a/lib/src/storage/fallback_backend.rs b/lib/src/storage/fallback_backend.rs index f2794886..de3f50a5 100644 --- a/lib/src/storage/fallback_backend.rs +++ b/lib/src/storage/fallback_backend.rs @@ -1,21 +1,40 @@ //! TODO: This storage is dramatically naive. +use std::collections::btree_map::Entry; use std::collections::BTreeMap; +use std::ffi::CString; use std::io::Result; +use std::iter::{once, Once}; use std::sync::{Arc, RwLock}; pub struct ColumnFamilyDefinition { pub name: &'static str, + pub merge_operator: Option, + pub compaction_filter: Option, } #[derive(Clone)] -pub struct Db(Arc, Vec>>>>); +pub struct Db(Arc>>); + +#[derive(Default)] +struct Tree { + tree: BTreeMap, Vec>, + merge_operator: Option, + compaction_filter: Option, +} impl Db { pub fn new(column_families: Vec) -> Result { let mut trees = BTreeMap::new(); for cf in column_families { - trees.insert(ColumnFamily(cf.name), BTreeMap::default()); + trees.insert( + ColumnFamily(cf.name), + Tree { + tree: BTreeMap::default(), + merge_operator: cf.merge_operator, + compaction_filter: cf.compaction_filter, + }, + ); } trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists. Ok(Self(Arc::new(RwLock::new(trees)))) @@ -30,7 +49,7 @@ impl Db { } } - pub fn flush(&self) -> Result<()> { + pub fn flush(&self, _column_family: &ColumnFamily) -> Result<()> { Ok(()) } @@ -41,6 +60,7 @@ impl Db { .unwrap() .get(column_family) .unwrap() + .tree .get(key) .map(|v| v.to_vec())) } @@ -52,6 +72,7 @@ impl Db { .unwrap() .get(column_family) .unwrap() + .tree .contains_key(key.as_ref())) } @@ -62,12 +83,18 @@ impl Db { value: &[u8], _low_priority: bool, ) -> Result<()> { - self.0 - .write() - .unwrap() - .get_mut(column_family) - .unwrap() - .insert(key.into(), value.into()); + let mut db = self.0.write().unwrap(); + let tree = db.get_mut(column_family).unwrap(); + let action = if let Some(filter) = &tree.compaction_filter { + (filter.filter)(key, value) + } else { + CompactionAction::Keep + }; + match action { + CompactionAction::Keep => tree.tree.insert(key.into(), value.into()), + CompactionAction::Remove => tree.tree.remove(key), + CompactionAction::Replace(value) => tree.tree.insert(key.into(), value), + }; Ok(()) } @@ -92,17 +119,70 @@ impl Db { .unwrap() .get_mut(column_family) .unwrap() + .tree .remove(key.as_ref()) .is_some()) } + pub fn merge( + &self, + column_family: &ColumnFamily, + key: &[u8], + value: &[u8], + _low_priority: bool, + ) -> Result<()> { + let mut db = self.0.write().unwrap(); + let tree = db.get_mut(column_family).unwrap(); + match tree.tree.entry(key.into()) { + Entry::Vacant(e) => { + let value = if let Some(merge) = &tree.merge_operator { + (merge.full)(key, None, once(value)) + } else { + value.into() + }; + let action = if let Some(filter) = &tree.compaction_filter { + (filter.filter)(key, &value) + } else { + CompactionAction::Keep + }; + match action { + CompactionAction::Keep => { + e.insert(value); + } + CompactionAction::Remove => (), + CompactionAction::Replace(value) => { + e.insert(value); + } + } + } + Entry::Occupied(mut e) => { + let value = if let Some(merge) = &tree.merge_operator { + (merge.full)(key, Some(&e.get()), once(value)) + } else { + value.into() + }; + let action = if let Some(filter) = &tree.compaction_filter { + (filter.filter)(key, &value) + } else { + CompactionAction::Keep + }; + match action { + CompactionAction::Keep => e.insert(value), + CompactionAction::Remove => e.remove(), + CompactionAction::Replace(value) => e.insert(value), + }; + } + } + Ok(()) + } + pub fn iter(&self, column_family: &ColumnFamily) -> Iter { self.scan_prefix(column_family, &[]) } pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Iter { let trees = self.0.read().unwrap(); - let tree = trees.get(column_family).unwrap(); + let tree = &trees.get(column_family).unwrap().tree; let data: Vec<_> = if prefix.is_empty() { tree.iter().map(|(k, v)| (k.clone(), v.clone())).collect() } else { @@ -117,7 +197,14 @@ impl Db { } pub fn len(&self, column_family: &ColumnFamily) -> Result { - Ok(self.0.read().unwrap().get(column_family).unwrap().len()) + Ok(self + .0 + .read() + .unwrap() + .get(column_family) + .unwrap() + .tree + .len()) } pub fn is_empty(&self, column_family: &ColumnFamily) -> Result { @@ -127,6 +214,7 @@ impl Db { .unwrap() .get(column_family) .unwrap() + .tree .is_empty()) } } @@ -156,3 +244,23 @@ impl Iter { Ok(()) } } + +pub struct MergeOperator { + pub full: fn(&[u8], Option<&[u8]>, SlicesIterator<'_>) -> Vec, + pub partial: fn(&[u8], SlicesIterator<'_>) -> Vec, + pub name: CString, +} + +pub type SlicesIterator<'a> = Once<&'a [u8]>; + +pub struct CompactionFilter { + pub filter: fn(&[u8], &[u8]) -> CompactionAction, + pub name: CString, +} + +#[allow(dead_code)] +pub enum CompactionAction { + Keep, + Remove, + Replace(Vec), +} diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index eb37c66f..1b525a4b 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -8,9 +8,16 @@ use crate::storage::binary_encoder::{ }; use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup, TermEncoder}; #[cfg(target_arch = "wasm32")] -use fallback_backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; +use fallback_backend::{ + ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter, + MergeOperator, +}; #[cfg(not(target_arch = "wasm32"))] -use rocksdb_backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; +use rocksdb_backend::{ + ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter, + MergeOperator, +}; +use std::ffi::CString; #[cfg(not(target_arch = "wasm32"))] use std::path::Path; @@ -66,20 +73,104 @@ impl Storage { fn column_families() -> Vec { vec![ - ColumnFamilyDefinition { name: ID2STR_CF }, - ColumnFamilyDefinition { name: SPOG_CF }, - ColumnFamilyDefinition { name: POSG_CF }, - ColumnFamilyDefinition { name: OSPG_CF }, - ColumnFamilyDefinition { name: GSPO_CF }, - ColumnFamilyDefinition { name: GPOS_CF }, - ColumnFamilyDefinition { name: GOSP_CF }, - ColumnFamilyDefinition { name: DSPO_CF }, - ColumnFamilyDefinition { name: DPOS_CF }, - ColumnFamilyDefinition { name: DOSP_CF }, - ColumnFamilyDefinition { name: GRAPHS_CF }, + ColumnFamilyDefinition { + name: ID2STR_CF, + merge_operator: Some(Self::str2id_merge()), + compaction_filter: Some(Self::str2id_filter()), + }, + ColumnFamilyDefinition { + name: SPOG_CF, + merge_operator: None, + compaction_filter: None, + }, + ColumnFamilyDefinition { + name: POSG_CF, + merge_operator: None, + compaction_filter: None, + }, + ColumnFamilyDefinition { + name: OSPG_CF, + merge_operator: None, + compaction_filter: None, + }, + ColumnFamilyDefinition { + name: GSPO_CF, + merge_operator: None, + compaction_filter: None, + }, + ColumnFamilyDefinition { + name: GPOS_CF, + merge_operator: None, + compaction_filter: None, + }, + ColumnFamilyDefinition { + name: GOSP_CF, + merge_operator: None, + compaction_filter: None, + }, + ColumnFamilyDefinition { + name: DSPO_CF, + merge_operator: None, + compaction_filter: None, + }, + ColumnFamilyDefinition { + name: DPOS_CF, + merge_operator: None, + compaction_filter: None, + }, + ColumnFamilyDefinition { + name: DOSP_CF, + merge_operator: None, + compaction_filter: None, + }, + ColumnFamilyDefinition { + name: GRAPHS_CF, + merge_operator: None, + compaction_filter: None, + }, ] } + fn str2id_merge() -> MergeOperator { + fn merge_counted_values<'a>(values: impl Iterator) -> Vec { + let (counter, str) = + values.fold((0, [].as_ref()), |(prev_counter, prev_str), current| { + ( + prev_counter + i32::from_be_bytes(current[..4].try_into().unwrap()), + if prev_str.is_empty() { + ¤t[4..] + } else { + prev_str + }, + ) + }); + let mut buffer = Vec::with_capacity(str.len() + 4); + buffer.extend_from_slice(&counter.to_be_bytes()); + buffer.extend_from_slice(str); + buffer + } + + MergeOperator { + full: |_, previous, values| merge_counted_values(previous.into_iter().chain(values)), + partial: |_, values| merge_counted_values(values), + name: CString::new("id2str_merge").unwrap(), + } + } + + fn str2id_filter() -> CompactionFilter { + CompactionFilter { + filter: |_, value| { + let counter = i32::from_be_bytes(value[..4].try_into().unwrap()); + if counter > 0 { + CompactionAction::Keep + } else { + CompactionAction::Remove + } + }, + name: CString::new("id2str_compaction_filter").unwrap(), + } + } + fn setup(db: Db) -> std::io::Result { let this = Self { default_cf: db.column_family(DEFAULT_CF).unwrap(), @@ -107,24 +198,26 @@ impl Storage { .insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name), false)?; } } + this.db.flush(&this.graphs_cf)?; version = 1; this.set_version(version)?; - this.db.flush()?; + this.db.flush(&this.default_cf)?; } if version == 1 { // We migrate to v2 let mut iter = this.db.iter(&this.id2str_cf); while let (Some(key), Some(value)) = (iter.key(), iter.value()) { let mut new_value = Vec::with_capacity(value.len() + 4); - new_value.extend_from_slice(&u32::MAX.to_be_bytes()); + new_value.extend_from_slice(&i32::MAX.to_be_bytes()); new_value.extend_from_slice(value); this.db.insert(&this.id2str_cf, key, &new_value, false)?; iter.next(); } iter.status()?; + this.db.flush(&this.id2str_cf)?; version = 2; this.set_version(version)?; - this.db.flush()?; + this.db.flush(&this.default_cf)?; } match version { @@ -684,13 +777,30 @@ impl Storage { #[cfg(not(target_arch = "wasm32"))] pub fn flush(&self) -> std::io::Result<()> { - self.db.flush() + self.db.flush(&self.default_cf)?; + self.db.flush(&self.gpos_cf)?; + self.db.flush(&self.gpos_cf)?; + self.db.flush(&self.gosp_cf)?; + self.db.flush(&self.spog_cf)?; + self.db.flush(&self.posg_cf)?; + self.db.flush(&self.ospg_cf)?; + self.db.flush(&self.dspo_cf)?; + self.db.flush(&self.dpos_cf)?; + self.db.flush(&self.dosp_cf)?; + self.db.flush(&self.id2str_cf) } pub fn get_str(&self, key: &StrHash) -> std::io::Result> { self.db .get(&self.id2str_cf, &key.to_be_bytes())? - .map(|v| String::from_utf8(v[4..].to_vec())) + .and_then(|v| { + let count = i32::from_be_bytes(v[..4].try_into().unwrap()); + if count > 0 { + Some(String::from_utf8(v[4..].to_vec())) + } else { + None + } + }) .transpose() .map_err(invalid_data_error) } @@ -774,37 +884,20 @@ impl TermEncoder for Storage { type Error = std::io::Error; fn insert_str(&self, key: &StrHash, value: &str) -> std::io::Result<()> { - if let Some(value) = self.db.get(&self.id2str_cf, &key.to_be_bytes())? { - let mut value = value.to_vec(); - let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?); - let new_number = number.saturating_add(1); - value[..4].copy_from_slice(&new_number.to_be_bytes()); - self.db - .insert(&self.id2str_cf, &key.to_be_bytes(), &value, true)? - } else { - let mut buffer = Vec::with_capacity(value.len() + 4); - buffer.extend_from_slice(&1_u32.to_be_bytes()); - buffer.extend_from_slice(value.as_bytes()); - self.db - .insert(&self.id2str_cf, &key.to_be_bytes(), &buffer, false)?; - } - Ok(()) + let mut buffer = Vec::with_capacity(value.len() + 4); + buffer.extend_from_slice(&1_i32.to_be_bytes()); + buffer.extend_from_slice(value.as_bytes()); + self.db + .merge(&self.id2str_cf, &key.to_be_bytes(), &buffer, false) } fn remove_str(&self, key: &StrHash) -> std::io::Result<()> { - if let Some(value) = self.db.get(&self.id2str_cf, &key.to_be_bytes())? { - let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?); - let new_number = number.saturating_sub(1); - if new_number == 0 { - self.db.remove(&self.id2str_cf, &key.to_be_bytes(), true)?; - } else { - let mut value = value.to_vec(); - value[..4].copy_from_slice(&new_number.to_be_bytes()); - self.db - .insert(&self.id2str_cf, &key.to_be_bytes(), &value, true)?; - } - } - Ok(()) + self.db.merge( + &self.id2str_cf, + &key.to_be_bytes(), + &(-1_i32).to_be_bytes(), + true, + ) } } @@ -860,6 +953,8 @@ mod tests { storage.insert(quad)?; storage.insert(quad2)?; storage.remove(quad2)?; + storage.flush()?; + storage.db.compact(&storage.id2str_cf)?; assert!(storage .get_str(&StrHash::new("http://example.com/s"))? .is_some()); diff --git a/lib/src/storage/rocksdb_backend.rs b/lib/src/storage/rocksdb_backend.rs index 1622fab3..56863f37 100644 --- a/lib/src/storage/rocksdb_backend.rs +++ b/lib/src/storage/rocksdb_backend.rs @@ -1,9 +1,11 @@ //! Code inspired by [https://github.com/rust-rocksdb/rust-rocksdb][Rust RocksDB] under Apache License 2.0. +//! +//! TODO: still has some memory leaks if the database opening fails #![allow(unsafe_code)] use crate::error::invalid_input_error; -use libc::{self, c_char, c_void}; +use libc::{self, c_char, c_int, c_uchar, c_void, size_t}; use oxrocksdb_sys::*; use std::borrow::Borrow; use std::env::temp_dir; @@ -39,6 +41,8 @@ macro_rules! ffi_result_impl { pub struct ColumnFamilyDefinition { pub name: &'static str, + pub merge_operator: Option, + pub compaction_filter: Option, } #[derive(Clone)] @@ -55,9 +59,12 @@ struct DbHandler { write_options: *mut rocksdb_writeoptions_t, low_priority_write_options: *mut rocksdb_writeoptions_t, flush_options: *mut rocksdb_flushoptions_t, + compaction_options: *mut rocksdb_compactoptions_t, env: Option<*mut rocksdb_env_t>, - column_families: Vec, + column_family_names: Vec<&'static str>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>, + cf_options: Vec<*mut rocksdb_options_t>, + cf_compaction_filters: Vec<*mut rocksdb_compactionfilter_t>, } impl Drop for DbHandler { @@ -67,15 +74,22 @@ impl Drop for DbHandler { rocksdb_column_family_handle_destroy(*cf_handle); } rocksdb_transactiondb_close(self.db); + for cf_option in &self.cf_options { + rocksdb_options_destroy(*cf_option); + } rocksdb_readoptions_destroy(self.read_options); rocksdb_writeoptions_destroy(self.write_options); rocksdb_writeoptions_destroy(self.low_priority_write_options); rocksdb_flushoptions_destroy(self.flush_options); + rocksdb_compactoptions_destroy(self.compaction_options); rocksdb_transactiondb_options_destroy(self.txn_options); rocksdb_options_destroy(self.options); if let Some(env) = self.env { rocksdb_env_destroy(env); } + for cf_compact in &self.cf_compaction_filters { + rocksdb_compactionfilter_destroy(*cf_compact); + } } } } @@ -145,47 +159,76 @@ impl Db { }; if !column_families.iter().any(|c| c.name == "default") { - column_families.push(ColumnFamilyDefinition { name: "default" }) + column_families.push(ColumnFamilyDefinition { + name: "default", + merge_operator: None, + compaction_filter: None, + }) } - let c_column_families = column_families + let column_family_names = column_families.iter().map(|c| c.name).collect::>(); + let c_column_families = column_family_names .iter() - .map(|cf| CString::new(cf.name)) + .map(|name| CString::new(*name)) .collect::, _>>() .map_err(invalid_input_error)?; - let cf_options: Vec<*const rocksdb_options_t> = vec![options; column_families.len()]; + let mut cf_compaction_filters = Vec::new(); + let cf_options = column_families + .into_iter() + .map(|cf| { + let options = rocksdb_options_create_copy(options); + if let Some(merge) = cf.merge_operator { + // mergeoperator delete is done automatically + let merge = rocksdb_mergeoperator_create( + Box::into_raw(Box::new(merge)) as *mut c_void, + Some(merge_destructor), + Some(merge_full), + Some(merge_partial), + Some(merge_delete_value), + Some(merge_name), + ); + assert!( + !merge.is_null(), + "rocksdb_mergeoperator_create returned null" + ); + rocksdb_options_set_merge_operator(options, merge); + } + if let Some(compact) = cf.compaction_filter { + let compact = rocksdb_compactionfilter_create( + Box::into_raw(Box::new(compact)) as *mut c_void, + Some(compactionfilter_destructor), + Some(compactionfilter_filter), + Some(compactionfilter_name), + ); + assert!( + !compact.is_null(), + "rocksdb_compactionfilter_create returned null" + ); + rocksdb_options_set_compaction_filter(options, compact); + cf_compaction_filters.push(compact); + } + options + }) + .collect::>(); let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = - vec![ptr::null_mut(); column_families.len()]; + vec![ptr::null_mut(); column_family_names.len()]; let db = ffi_result!(rocksdb_transactiondb_open_column_families( options, txn_options, c_path.as_ptr(), - column_families.len().try_into().unwrap(), + c_column_families.len().try_into().unwrap(), c_column_families .iter() .map(|cf| cf.as_ptr()) .collect::>() .as_ptr(), - cf_options.as_ptr(), + cf_options.as_ptr() as *const *const rocksdb_options_t, cf_handles.as_mut_ptr(), - )) - .map_err(|e| { - rocksdb_options_destroy(options); - rocksdb_transactiondb_options_destroy(txn_options); - if let Some(env) = env { - rocksdb_env_destroy(env); - } - e - })?; + ))?; assert!(!db.is_null(), "rocksdb_create returned null"); for handle in &cf_handles { if handle.is_null() { rocksdb_transactiondb_close(db); - rocksdb_options_destroy(options); - rocksdb_transactiondb_options_destroy(txn_options); - if let Some(env) = env { - rocksdb_env_destroy(env); - } return Err(other_error( "Received null column family handle from RocksDB.", )); @@ -220,6 +263,12 @@ impl Db { "rocksdb_flushoptions_create returned null" ); + let compaction_options = rocksdb_compactoptions_create(); + assert!( + !compaction_options.is_null(), + "rocksdb_compactoptions_create returned null" + ); + Ok(DbHandler { db, options, @@ -228,24 +277,48 @@ impl Db { write_options, low_priority_write_options, flush_options, + compaction_options, env, - column_families, + column_family_names, cf_handles, + cf_options, + cf_compaction_filters, }) } } pub fn column_family(&self, name: &'static str) -> Option { - for (cf, cf_handle) in self.0.column_families.iter().zip(&self.0.cf_handles) { - if cf.name == name { + for (cf, cf_handle) in self.0.column_family_names.iter().zip(&self.0.cf_handles) { + if *cf == name { return Some(ColumnFamily(*cf_handle)); } } None } - pub fn flush(&self) -> Result<()> { - unsafe { ffi_result!(rocksdb_transactiondb_flush(self.0.db, self.0.flush_options)) } + pub fn flush(&self, column_family: &ColumnFamily) -> Result<()> { + unsafe { + ffi_result!(rocksdb_transactiondb_flush_cf( + self.0.db, + self.0.flush_options, + column_family.0, + )) + } + } + + #[cfg(test)] + pub fn compact(&self, column_family: &ColumnFamily) -> Result<()> { + unsafe { + ffi_result!(rocksdb_transactiondb_compact_range_cf_opt( + self.0.db, + column_family.0, + self.0.compaction_options, + ptr::null(), + 0, + ptr::null(), + 0 + )) + } } pub fn get( @@ -300,6 +373,30 @@ impl Db { } } + pub fn merge( + &self, + column_family: &ColumnFamily, + key: &[u8], + value: &[u8], + low_priority: bool, + ) -> Result<()> { + unsafe { + ffi_result!(rocksdb_transactiondb_merge_cf( + self.0.db, + if low_priority { + self.0.low_priority_write_options + } else { + self.0.write_options + }, + column_family.0, + key.as_ptr() as *const c_char, + key.len(), + value.as_ptr() as *const c_char, + value.len(), + )) + } + } + pub fn insert_empty( &self, column_family: &ColumnFamily, @@ -521,3 +618,161 @@ fn convert_error(ptr: *const c_char) -> Error { fn other_error(error: impl Into>) -> Error { Error::new(ErrorKind::InvalidInput, error) } + +pub struct MergeOperator { + pub full: fn(&[u8], Option<&[u8]>, SlicesIterator<'_>) -> Vec, + pub partial: fn(&[u8], SlicesIterator<'_>) -> Vec, + pub name: CString, +} + +unsafe extern "C" fn merge_destructor(operator: *mut c_void) { + Box::from_raw(operator as *mut MergeOperator); +} + +unsafe extern "C" fn merge_full( + operator: *mut c_void, + key: *const c_char, + key_length: size_t, + existing_value: *const c_char, + existing_value_len: size_t, + operands_list: *const *const c_char, + operands_list_length: *const size_t, + num_operands: c_int, + success: *mut u8, + new_value_length: *mut size_t, +) -> *mut c_char { + let operator = &*(operator as *const MergeOperator); + let num_operands = usize::try_from(num_operands).unwrap(); + let result = (operator.full)( + slice::from_raw_parts(key as *const u8, key_length), + if existing_value.is_null() { + None + } else { + Some(slice::from_raw_parts( + existing_value as *const u8, + existing_value_len, + )) + }, + SlicesIterator { + slices: slice::from_raw_parts(operands_list, num_operands), + lengths: slice::from_raw_parts(operands_list_length, num_operands), + cursor: 0, + }, + ); + *new_value_length = result.len(); + *success = 1_u8; + Box::into_raw(result.into_boxed_slice()) as *mut c_char +} + +pub unsafe extern "C" fn merge_partial( + operator: *mut c_void, + key: *const c_char, + key_length: size_t, + operands_list: *const *const c_char, + operands_list_length: *const size_t, + num_operands: c_int, + success: *mut u8, + new_value_length: *mut size_t, +) -> *mut c_char { + let operator = &*(operator as *const MergeOperator); + let num_operands = usize::try_from(num_operands).unwrap(); + let result = (operator.partial)( + slice::from_raw_parts(key as *const u8, key_length), + SlicesIterator { + slices: slice::from_raw_parts(operands_list, num_operands), + lengths: slice::from_raw_parts(operands_list_length, num_operands), + cursor: 0, + }, + ); + *new_value_length = result.len(); + *success = 1_u8; + Box::into_raw(result.into_boxed_slice()) as *mut c_char +} + +unsafe extern "C" fn merge_delete_value( + _operator: *mut c_void, + value: *const c_char, + value_length: size_t, +) { + if !value.is_null() { + Box::from_raw(slice::from_raw_parts_mut(value as *mut u8, value_length)); + } +} + +unsafe extern "C" fn merge_name(operator: *mut c_void) -> *const c_char { + let operator = &*(operator as *const MergeOperator); + operator.name.as_ptr() +} + +pub struct SlicesIterator<'a> { + slices: &'a [*const c_char], + lengths: &'a [size_t], + cursor: usize, +} + +impl<'a> Iterator for SlicesIterator<'a> { + type Item = &'a [u8]; + + fn next(&mut self) -> Option { + if self.cursor >= self.slices.len() { + None + } else { + let slice = unsafe { + slice::from_raw_parts( + self.slices[self.cursor] as *const u8, + self.lengths[self.cursor], + ) + }; + self.cursor += 1; + Some(slice) + } + } +} + +pub struct CompactionFilter { + pub filter: fn(&[u8], &[u8]) -> CompactionAction, + pub name: CString, +} + +#[allow(dead_code)] +pub enum CompactionAction { + Keep, + Remove, + Replace(Vec), +} + +unsafe extern "C" fn compactionfilter_destructor(filter: *mut c_void) { + Box::from_raw(filter as *mut CompactionFilter); +} + +unsafe extern "C" fn compactionfilter_filter( + filter: *mut c_void, + _level: c_int, + key: *const c_char, + key_length: size_t, + existing_value: *const c_char, + value_length: size_t, + new_value: *mut *mut c_char, + new_value_length: *mut size_t, + value_changed: *mut c_uchar, +) -> c_uchar { + let filter = &*(filter as *const CompactionFilter); + match (filter.filter)( + slice::from_raw_parts(key as *const u8, key_length), + slice::from_raw_parts(existing_value as *const u8, value_length), + ) { + CompactionAction::Keep => 0, + CompactionAction::Remove => 1, + CompactionAction::Replace(new_val) => { + *new_value_length = new_val.len(); + *value_changed = 1_u8; + *new_value = Box::into_raw(new_val.into_boxed_slice()) as *mut c_char; + 0 + } + } +} + +unsafe extern "C" fn compactionfilter_name(filter: *mut c_void) -> *const c_char { + let filter = &*(filter as *const CompactionFilter); + filter.name.as_ptr() +} diff --git a/rocksdb-sys/api/c.cc b/rocksdb-sys/api/c.cc index 87da6324..ff843a92 100644 --- a/rocksdb-sys/api/c.cc +++ b/rocksdb-sys/api/c.cc @@ -20,11 +20,26 @@ rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf( return v; } -void rocksdb_transactiondb_flush( +void rocksdb_transactiondb_flush_cf( rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, + rocksdb_column_family_handle_t* column_family, char** errptr) { - SaveError(errptr, db->rep->Flush(options->rep)); + SaveError(errptr, db->rep->Flush(options->rep, column_family->rep)); +} + +void rocksdb_transactiondb_compact_range_cf_opt(rocksdb_transactiondb_t* db, + rocksdb_column_family_handle_t* column_family, + rocksdb_compactoptions_t* opt, + const char* start_key, size_t start_key_len, + const char* limit_key, size_t limit_key_len, + char** errptr) { + Slice a, b; + SaveError(errptr, db->rep->CompactRange( + opt->rep, column_family->rep, + // Pass nullptr Slice if corresponding "const char*" is nullptr + (start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr), + (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr))); } rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy(rocksdb_writeoptions_t* options) { diff --git a/rocksdb-sys/api/c.h b/rocksdb-sys/api/c.h index 916ccb48..280e9b47 100644 --- a/rocksdb-sys/api/c.h +++ b/rocksdb-sys/api/c.h @@ -11,9 +11,14 @@ extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pi rocksdb_column_family_handle_t* column_family, const char* key, size_t keylen, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf( + rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, + rocksdb_column_family_handle_t* column_family, char** errptr); -extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush( - rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_compact_range_cf_opt( + rocksdb_transactiondb_t* db, rocksdb_column_family_handle_t* column_family, + rocksdb_compactoptions_t* opt, const char* start_key, size_t start_key_len, + const char* limit_key, size_t limit_key_len, char** errptr); extern ROCKSDB_LIBRARY_API rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy( rocksdb_writeoptions_t*);