Storage: Manages strings GC using merge operation

Removes old strings during compaction
pull/171/head
Tpt 3 years ago
parent 045f40ccad
commit cddb5900ef
  1. 130
      lib/src/storage/fallback_backend.rs
  2. 181
      lib/src/storage/mod.rs
  3. 311
      lib/src/storage/rocksdb_backend.rs
  4. 19
      rocksdb-sys/api/c.cc
  5. 9
      rocksdb-sys/api/c.h

@ -1,21 +1,40 @@
//! TODO: This storage is dramatically naive. //! TODO: This storage is dramatically naive.
use std::collections::btree_map::Entry;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::ffi::CString;
use std::io::Result; use std::io::Result;
use std::iter::{once, Once};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
pub struct ColumnFamilyDefinition { pub struct ColumnFamilyDefinition {
pub name: &'static str, pub name: &'static str,
pub merge_operator: Option<MergeOperator>,
pub compaction_filter: Option<CompactionFilter>,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct Db(Arc<RwLock<BTreeMap<ColumnFamily, BTreeMap<Vec<u8>, Vec<u8>>>>>); pub struct Db(Arc<RwLock<BTreeMap<ColumnFamily, Tree>>>);
#[derive(Default)]
struct Tree {
tree: BTreeMap<Vec<u8>, Vec<u8>>,
merge_operator: Option<MergeOperator>,
compaction_filter: Option<CompactionFilter>,
}
impl Db { impl Db {
pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> { pub fn new(column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> {
let mut trees = BTreeMap::new(); let mut trees = BTreeMap::new();
for cf in column_families { for cf in column_families {
trees.insert(ColumnFamily(cf.name), BTreeMap::default()); trees.insert(
ColumnFamily(cf.name),
Tree {
tree: BTreeMap::default(),
merge_operator: cf.merge_operator,
compaction_filter: cf.compaction_filter,
},
);
} }
trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists. trees.entry(ColumnFamily("default")).or_default(); // We make sure that "default" key exists.
Ok(Self(Arc::new(RwLock::new(trees)))) Ok(Self(Arc::new(RwLock::new(trees))))
@ -30,7 +49,7 @@ impl Db {
} }
} }
pub fn flush(&self) -> Result<()> { pub fn flush(&self, _column_family: &ColumnFamily) -> Result<()> {
Ok(()) Ok(())
} }
@ -41,6 +60,7 @@ impl Db {
.unwrap() .unwrap()
.get(column_family) .get(column_family)
.unwrap() .unwrap()
.tree
.get(key) .get(key)
.map(|v| v.to_vec())) .map(|v| v.to_vec()))
} }
@ -52,6 +72,7 @@ impl Db {
.unwrap() .unwrap()
.get(column_family) .get(column_family)
.unwrap() .unwrap()
.tree
.contains_key(key.as_ref())) .contains_key(key.as_ref()))
} }
@ -62,12 +83,18 @@ impl Db {
value: &[u8], value: &[u8],
_low_priority: bool, _low_priority: bool,
) -> Result<()> { ) -> Result<()> {
self.0 let mut db = self.0.write().unwrap();
.write() let tree = db.get_mut(column_family).unwrap();
.unwrap() let action = if let Some(filter) = &tree.compaction_filter {
.get_mut(column_family) (filter.filter)(key, value)
.unwrap() } else {
.insert(key.into(), value.into()); CompactionAction::Keep
};
match action {
CompactionAction::Keep => tree.tree.insert(key.into(), value.into()),
CompactionAction::Remove => tree.tree.remove(key),
CompactionAction::Replace(value) => tree.tree.insert(key.into(), value),
};
Ok(()) Ok(())
} }
@ -92,17 +119,70 @@ impl Db {
.unwrap() .unwrap()
.get_mut(column_family) .get_mut(column_family)
.unwrap() .unwrap()
.tree
.remove(key.as_ref()) .remove(key.as_ref())
.is_some()) .is_some())
} }
pub fn merge(
&self,
column_family: &ColumnFamily,
key: &[u8],
value: &[u8],
_low_priority: bool,
) -> Result<()> {
let mut db = self.0.write().unwrap();
let tree = db.get_mut(column_family).unwrap();
match tree.tree.entry(key.into()) {
Entry::Vacant(e) => {
let value = if let Some(merge) = &tree.merge_operator {
(merge.full)(key, None, once(value))
} else {
value.into()
};
let action = if let Some(filter) = &tree.compaction_filter {
(filter.filter)(key, &value)
} else {
CompactionAction::Keep
};
match action {
CompactionAction::Keep => {
e.insert(value);
}
CompactionAction::Remove => (),
CompactionAction::Replace(value) => {
e.insert(value);
}
}
}
Entry::Occupied(mut e) => {
let value = if let Some(merge) = &tree.merge_operator {
(merge.full)(key, Some(&e.get()), once(value))
} else {
value.into()
};
let action = if let Some(filter) = &tree.compaction_filter {
(filter.filter)(key, &value)
} else {
CompactionAction::Keep
};
match action {
CompactionAction::Keep => e.insert(value),
CompactionAction::Remove => e.remove(),
CompactionAction::Replace(value) => e.insert(value),
};
}
}
Ok(())
}
pub fn iter(&self, column_family: &ColumnFamily) -> Iter { pub fn iter(&self, column_family: &ColumnFamily) -> Iter {
self.scan_prefix(column_family, &[]) self.scan_prefix(column_family, &[])
} }
pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Iter { pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Iter {
let trees = self.0.read().unwrap(); let trees = self.0.read().unwrap();
let tree = trees.get(column_family).unwrap(); let tree = &trees.get(column_family).unwrap().tree;
let data: Vec<_> = if prefix.is_empty() { let data: Vec<_> = if prefix.is_empty() {
tree.iter().map(|(k, v)| (k.clone(), v.clone())).collect() tree.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
} else { } else {
@ -117,7 +197,14 @@ impl Db {
} }
pub fn len(&self, column_family: &ColumnFamily) -> Result<usize> { pub fn len(&self, column_family: &ColumnFamily) -> Result<usize> {
Ok(self.0.read().unwrap().get(column_family).unwrap().len()) Ok(self
.0
.read()
.unwrap()
.get(column_family)
.unwrap()
.tree
.len())
} }
pub fn is_empty(&self, column_family: &ColumnFamily) -> Result<bool> { pub fn is_empty(&self, column_family: &ColumnFamily) -> Result<bool> {
@ -127,6 +214,7 @@ impl Db {
.unwrap() .unwrap()
.get(column_family) .get(column_family)
.unwrap() .unwrap()
.tree
.is_empty()) .is_empty())
} }
} }
@ -156,3 +244,23 @@ impl Iter {
Ok(()) Ok(())
} }
} }
pub struct MergeOperator {
pub full: fn(&[u8], Option<&[u8]>, SlicesIterator<'_>) -> Vec<u8>,
pub partial: fn(&[u8], SlicesIterator<'_>) -> Vec<u8>,
pub name: CString,
}
pub type SlicesIterator<'a> = Once<&'a [u8]>;
pub struct CompactionFilter {
pub filter: fn(&[u8], &[u8]) -> CompactionAction,
pub name: CString,
}
#[allow(dead_code)]
pub enum CompactionAction {
Keep,
Remove,
Replace(Vec<u8>),
}

@ -8,9 +8,16 @@ use crate::storage::binary_encoder::{
}; };
use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup, TermEncoder}; use crate::storage::numeric_encoder::{EncodedQuad, EncodedTerm, StrHash, StrLookup, TermEncoder};
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
use fallback_backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; use fallback_backend::{
ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter,
MergeOperator,
};
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use rocksdb_backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter}; use rocksdb_backend::{
ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter,
MergeOperator,
};
use std::ffi::CString;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::path::Path; use std::path::Path;
@ -66,20 +73,104 @@ impl Storage {
fn column_families() -> Vec<ColumnFamilyDefinition> { fn column_families() -> Vec<ColumnFamilyDefinition> {
vec![ vec![
ColumnFamilyDefinition { name: ID2STR_CF }, ColumnFamilyDefinition {
ColumnFamilyDefinition { name: SPOG_CF }, name: ID2STR_CF,
ColumnFamilyDefinition { name: POSG_CF }, merge_operator: Some(Self::str2id_merge()),
ColumnFamilyDefinition { name: OSPG_CF }, compaction_filter: Some(Self::str2id_filter()),
ColumnFamilyDefinition { name: GSPO_CF }, },
ColumnFamilyDefinition { name: GPOS_CF }, ColumnFamilyDefinition {
ColumnFamilyDefinition { name: GOSP_CF }, name: SPOG_CF,
ColumnFamilyDefinition { name: DSPO_CF }, merge_operator: None,
ColumnFamilyDefinition { name: DPOS_CF }, compaction_filter: None,
ColumnFamilyDefinition { name: DOSP_CF }, },
ColumnFamilyDefinition { name: GRAPHS_CF }, ColumnFamilyDefinition {
name: POSG_CF,
merge_operator: None,
compaction_filter: None,
},
ColumnFamilyDefinition {
name: OSPG_CF,
merge_operator: None,
compaction_filter: None,
},
ColumnFamilyDefinition {
name: GSPO_CF,
merge_operator: None,
compaction_filter: None,
},
ColumnFamilyDefinition {
name: GPOS_CF,
merge_operator: None,
compaction_filter: None,
},
ColumnFamilyDefinition {
name: GOSP_CF,
merge_operator: None,
compaction_filter: None,
},
ColumnFamilyDefinition {
name: DSPO_CF,
merge_operator: None,
compaction_filter: None,
},
ColumnFamilyDefinition {
name: DPOS_CF,
merge_operator: None,
compaction_filter: None,
},
ColumnFamilyDefinition {
name: DOSP_CF,
merge_operator: None,
compaction_filter: None,
},
ColumnFamilyDefinition {
name: GRAPHS_CF,
merge_operator: None,
compaction_filter: None,
},
] ]
} }
fn str2id_merge() -> MergeOperator {
fn merge_counted_values<'a>(values: impl Iterator<Item = &'a [u8]>) -> Vec<u8> {
let (counter, str) =
values.fold((0, [].as_ref()), |(prev_counter, prev_str), current| {
(
prev_counter + i32::from_be_bytes(current[..4].try_into().unwrap()),
if prev_str.is_empty() {
&current[4..]
} else {
prev_str
},
)
});
let mut buffer = Vec::with_capacity(str.len() + 4);
buffer.extend_from_slice(&counter.to_be_bytes());
buffer.extend_from_slice(str);
buffer
}
MergeOperator {
full: |_, previous, values| merge_counted_values(previous.into_iter().chain(values)),
partial: |_, values| merge_counted_values(values),
name: CString::new("id2str_merge").unwrap(),
}
}
fn str2id_filter() -> CompactionFilter {
CompactionFilter {
filter: |_, value| {
let counter = i32::from_be_bytes(value[..4].try_into().unwrap());
if counter > 0 {
CompactionAction::Keep
} else {
CompactionAction::Remove
}
},
name: CString::new("id2str_compaction_filter").unwrap(),
}
}
fn setup(db: Db) -> std::io::Result<Self> { fn setup(db: Db) -> std::io::Result<Self> {
let this = Self { let this = Self {
default_cf: db.column_family(DEFAULT_CF).unwrap(), default_cf: db.column_family(DEFAULT_CF).unwrap(),
@ -107,24 +198,26 @@ impl Storage {
.insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name), false)?; .insert_empty(&this.graphs_cf, &encode_term(&quad.graph_name), false)?;
} }
} }
this.db.flush(&this.graphs_cf)?;
version = 1; version = 1;
this.set_version(version)?; this.set_version(version)?;
this.db.flush()?; this.db.flush(&this.default_cf)?;
} }
if version == 1 { if version == 1 {
// We migrate to v2 // We migrate to v2
let mut iter = this.db.iter(&this.id2str_cf); let mut iter = this.db.iter(&this.id2str_cf);
while let (Some(key), Some(value)) = (iter.key(), iter.value()) { while let (Some(key), Some(value)) = (iter.key(), iter.value()) {
let mut new_value = Vec::with_capacity(value.len() + 4); let mut new_value = Vec::with_capacity(value.len() + 4);
new_value.extend_from_slice(&u32::MAX.to_be_bytes()); new_value.extend_from_slice(&i32::MAX.to_be_bytes());
new_value.extend_from_slice(value); new_value.extend_from_slice(value);
this.db.insert(&this.id2str_cf, key, &new_value, false)?; this.db.insert(&this.id2str_cf, key, &new_value, false)?;
iter.next(); iter.next();
} }
iter.status()?; iter.status()?;
this.db.flush(&this.id2str_cf)?;
version = 2; version = 2;
this.set_version(version)?; this.set_version(version)?;
this.db.flush()?; this.db.flush(&this.default_cf)?;
} }
match version { match version {
@ -684,13 +777,30 @@ impl Storage {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn flush(&self) -> std::io::Result<()> { pub fn flush(&self) -> std::io::Result<()> {
self.db.flush() self.db.flush(&self.default_cf)?;
self.db.flush(&self.gpos_cf)?;
self.db.flush(&self.gpos_cf)?;
self.db.flush(&self.gosp_cf)?;
self.db.flush(&self.spog_cf)?;
self.db.flush(&self.posg_cf)?;
self.db.flush(&self.ospg_cf)?;
self.db.flush(&self.dspo_cf)?;
self.db.flush(&self.dpos_cf)?;
self.db.flush(&self.dosp_cf)?;
self.db.flush(&self.id2str_cf)
} }
pub fn get_str(&self, key: &StrHash) -> std::io::Result<Option<String>> { pub fn get_str(&self, key: &StrHash) -> std::io::Result<Option<String>> {
self.db self.db
.get(&self.id2str_cf, &key.to_be_bytes())? .get(&self.id2str_cf, &key.to_be_bytes())?
.map(|v| String::from_utf8(v[4..].to_vec())) .and_then(|v| {
let count = i32::from_be_bytes(v[..4].try_into().unwrap());
if count > 0 {
Some(String::from_utf8(v[4..].to_vec()))
} else {
None
}
})
.transpose() .transpose()
.map_err(invalid_data_error) .map_err(invalid_data_error)
} }
@ -774,37 +884,20 @@ impl TermEncoder for Storage {
type Error = std::io::Error; type Error = std::io::Error;
fn insert_str(&self, key: &StrHash, value: &str) -> std::io::Result<()> { fn insert_str(&self, key: &StrHash, value: &str) -> std::io::Result<()> {
if let Some(value) = self.db.get(&self.id2str_cf, &key.to_be_bytes())? {
let mut value = value.to_vec();
let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?);
let new_number = number.saturating_add(1);
value[..4].copy_from_slice(&new_number.to_be_bytes());
self.db
.insert(&self.id2str_cf, &key.to_be_bytes(), &value, true)?
} else {
let mut buffer = Vec::with_capacity(value.len() + 4); let mut buffer = Vec::with_capacity(value.len() + 4);
buffer.extend_from_slice(&1_u32.to_be_bytes()); buffer.extend_from_slice(&1_i32.to_be_bytes());
buffer.extend_from_slice(value.as_bytes()); buffer.extend_from_slice(value.as_bytes());
self.db self.db
.insert(&self.id2str_cf, &key.to_be_bytes(), &buffer, false)?; .merge(&self.id2str_cf, &key.to_be_bytes(), &buffer, false)
}
Ok(())
} }
fn remove_str(&self, key: &StrHash) -> std::io::Result<()> { fn remove_str(&self, key: &StrHash) -> std::io::Result<()> {
if let Some(value) = self.db.get(&self.id2str_cf, &key.to_be_bytes())? { self.db.merge(
let number = u32::from_be_bytes(value[..4].try_into().map_err(invalid_data_error)?); &self.id2str_cf,
let new_number = number.saturating_sub(1); &key.to_be_bytes(),
if new_number == 0 { &(-1_i32).to_be_bytes(),
self.db.remove(&self.id2str_cf, &key.to_be_bytes(), true)?; true,
} else { )
let mut value = value.to_vec();
value[..4].copy_from_slice(&new_number.to_be_bytes());
self.db
.insert(&self.id2str_cf, &key.to_be_bytes(), &value, true)?;
}
}
Ok(())
} }
} }
@ -860,6 +953,8 @@ mod tests {
storage.insert(quad)?; storage.insert(quad)?;
storage.insert(quad2)?; storage.insert(quad2)?;
storage.remove(quad2)?; storage.remove(quad2)?;
storage.flush()?;
storage.db.compact(&storage.id2str_cf)?;
assert!(storage assert!(storage
.get_str(&StrHash::new("http://example.com/s"))? .get_str(&StrHash::new("http://example.com/s"))?
.is_some()); .is_some());

@ -1,9 +1,11 @@
//! Code inspired by [https://github.com/rust-rocksdb/rust-rocksdb][Rust RocksDB] under Apache License 2.0. //! Code inspired by [https://github.com/rust-rocksdb/rust-rocksdb][Rust RocksDB] under Apache License 2.0.
//!
//! TODO: still has some memory leaks if the database opening fails
#![allow(unsafe_code)] #![allow(unsafe_code)]
use crate::error::invalid_input_error; use crate::error::invalid_input_error;
use libc::{self, c_char, c_void}; use libc::{self, c_char, c_int, c_uchar, c_void, size_t};
use oxrocksdb_sys::*; use oxrocksdb_sys::*;
use std::borrow::Borrow; use std::borrow::Borrow;
use std::env::temp_dir; use std::env::temp_dir;
@ -39,6 +41,8 @@ macro_rules! ffi_result_impl {
pub struct ColumnFamilyDefinition { pub struct ColumnFamilyDefinition {
pub name: &'static str, pub name: &'static str,
pub merge_operator: Option<MergeOperator>,
pub compaction_filter: Option<CompactionFilter>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -55,9 +59,12 @@ struct DbHandler {
write_options: *mut rocksdb_writeoptions_t, write_options: *mut rocksdb_writeoptions_t,
low_priority_write_options: *mut rocksdb_writeoptions_t, low_priority_write_options: *mut rocksdb_writeoptions_t,
flush_options: *mut rocksdb_flushoptions_t, flush_options: *mut rocksdb_flushoptions_t,
compaction_options: *mut rocksdb_compactoptions_t,
env: Option<*mut rocksdb_env_t>, env: Option<*mut rocksdb_env_t>,
column_families: Vec<ColumnFamilyDefinition>, column_family_names: Vec<&'static str>,
cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>,
cf_options: Vec<*mut rocksdb_options_t>,
cf_compaction_filters: Vec<*mut rocksdb_compactionfilter_t>,
} }
impl Drop for DbHandler { impl Drop for DbHandler {
@ -67,15 +74,22 @@ impl Drop for DbHandler {
rocksdb_column_family_handle_destroy(*cf_handle); rocksdb_column_family_handle_destroy(*cf_handle);
} }
rocksdb_transactiondb_close(self.db); rocksdb_transactiondb_close(self.db);
for cf_option in &self.cf_options {
rocksdb_options_destroy(*cf_option);
}
rocksdb_readoptions_destroy(self.read_options); rocksdb_readoptions_destroy(self.read_options);
rocksdb_writeoptions_destroy(self.write_options); rocksdb_writeoptions_destroy(self.write_options);
rocksdb_writeoptions_destroy(self.low_priority_write_options); rocksdb_writeoptions_destroy(self.low_priority_write_options);
rocksdb_flushoptions_destroy(self.flush_options); rocksdb_flushoptions_destroy(self.flush_options);
rocksdb_compactoptions_destroy(self.compaction_options);
rocksdb_transactiondb_options_destroy(self.txn_options); rocksdb_transactiondb_options_destroy(self.txn_options);
rocksdb_options_destroy(self.options); rocksdb_options_destroy(self.options);
if let Some(env) = self.env { if let Some(env) = self.env {
rocksdb_env_destroy(env); rocksdb_env_destroy(env);
} }
for cf_compact in &self.cf_compaction_filters {
rocksdb_compactionfilter_destroy(*cf_compact);
}
} }
} }
} }
@ -145,47 +159,76 @@ impl Db {
}; };
if !column_families.iter().any(|c| c.name == "default") { if !column_families.iter().any(|c| c.name == "default") {
column_families.push(ColumnFamilyDefinition { name: "default" }) column_families.push(ColumnFamilyDefinition {
name: "default",
merge_operator: None,
compaction_filter: None,
})
} }
let c_column_families = column_families let column_family_names = column_families.iter().map(|c| c.name).collect::<Vec<_>>();
let c_column_families = column_family_names
.iter() .iter()
.map(|cf| CString::new(cf.name)) .map(|name| CString::new(*name))
.collect::<std::result::Result<Vec<_>, _>>() .collect::<std::result::Result<Vec<_>, _>>()
.map_err(invalid_input_error)?; .map_err(invalid_input_error)?;
let cf_options: Vec<*const rocksdb_options_t> = vec![options; column_families.len()]; let mut cf_compaction_filters = Vec::new();
let cf_options = column_families
.into_iter()
.map(|cf| {
let options = rocksdb_options_create_copy(options);
if let Some(merge) = cf.merge_operator {
// mergeoperator delete is done automatically
let merge = rocksdb_mergeoperator_create(
Box::into_raw(Box::new(merge)) as *mut c_void,
Some(merge_destructor),
Some(merge_full),
Some(merge_partial),
Some(merge_delete_value),
Some(merge_name),
);
assert!(
!merge.is_null(),
"rocksdb_mergeoperator_create returned null"
);
rocksdb_options_set_merge_operator(options, merge);
}
if let Some(compact) = cf.compaction_filter {
let compact = rocksdb_compactionfilter_create(
Box::into_raw(Box::new(compact)) as *mut c_void,
Some(compactionfilter_destructor),
Some(compactionfilter_filter),
Some(compactionfilter_name),
);
assert!(
!compact.is_null(),
"rocksdb_compactionfilter_create returned null"
);
rocksdb_options_set_compaction_filter(options, compact);
cf_compaction_filters.push(compact);
}
options
})
.collect::<Vec<_>>();
let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> = let mut cf_handles: Vec<*mut rocksdb_column_family_handle_t> =
vec![ptr::null_mut(); column_families.len()]; vec![ptr::null_mut(); column_family_names.len()];
let db = ffi_result!(rocksdb_transactiondb_open_column_families( let db = ffi_result!(rocksdb_transactiondb_open_column_families(
options, options,
txn_options, txn_options,
c_path.as_ptr(), c_path.as_ptr(),
column_families.len().try_into().unwrap(), c_column_families.len().try_into().unwrap(),
c_column_families c_column_families
.iter() .iter()
.map(|cf| cf.as_ptr()) .map(|cf| cf.as_ptr())
.collect::<Vec<_>>() .collect::<Vec<_>>()
.as_ptr(), .as_ptr(),
cf_options.as_ptr(), cf_options.as_ptr() as *const *const rocksdb_options_t,
cf_handles.as_mut_ptr(), cf_handles.as_mut_ptr(),
)) ))?;
.map_err(|e| {
rocksdb_options_destroy(options);
rocksdb_transactiondb_options_destroy(txn_options);
if let Some(env) = env {
rocksdb_env_destroy(env);
}
e
})?;
assert!(!db.is_null(), "rocksdb_create returned null"); assert!(!db.is_null(), "rocksdb_create returned null");
for handle in &cf_handles { for handle in &cf_handles {
if handle.is_null() { if handle.is_null() {
rocksdb_transactiondb_close(db); rocksdb_transactiondb_close(db);
rocksdb_options_destroy(options);
rocksdb_transactiondb_options_destroy(txn_options);
if let Some(env) = env {
rocksdb_env_destroy(env);
}
return Err(other_error( return Err(other_error(
"Received null column family handle from RocksDB.", "Received null column family handle from RocksDB.",
)); ));
@ -220,6 +263,12 @@ impl Db {
"rocksdb_flushoptions_create returned null" "rocksdb_flushoptions_create returned null"
); );
let compaction_options = rocksdb_compactoptions_create();
assert!(
!compaction_options.is_null(),
"rocksdb_compactoptions_create returned null"
);
Ok(DbHandler { Ok(DbHandler {
db, db,
options, options,
@ -228,24 +277,48 @@ impl Db {
write_options, write_options,
low_priority_write_options, low_priority_write_options,
flush_options, flush_options,
compaction_options,
env, env,
column_families, column_family_names,
cf_handles, cf_handles,
cf_options,
cf_compaction_filters,
}) })
} }
} }
pub fn column_family(&self, name: &'static str) -> Option<ColumnFamily> { pub fn column_family(&self, name: &'static str) -> Option<ColumnFamily> {
for (cf, cf_handle) in self.0.column_families.iter().zip(&self.0.cf_handles) { for (cf, cf_handle) in self.0.column_family_names.iter().zip(&self.0.cf_handles) {
if cf.name == name { if *cf == name {
return Some(ColumnFamily(*cf_handle)); return Some(ColumnFamily(*cf_handle));
} }
} }
None None
} }
pub fn flush(&self) -> Result<()> { pub fn flush(&self, column_family: &ColumnFamily) -> Result<()> {
unsafe { ffi_result!(rocksdb_transactiondb_flush(self.0.db, self.0.flush_options)) } unsafe {
ffi_result!(rocksdb_transactiondb_flush_cf(
self.0.db,
self.0.flush_options,
column_family.0,
))
}
}
#[cfg(test)]
pub fn compact(&self, column_family: &ColumnFamily) -> Result<()> {
unsafe {
ffi_result!(rocksdb_transactiondb_compact_range_cf_opt(
self.0.db,
column_family.0,
self.0.compaction_options,
ptr::null(),
0,
ptr::null(),
0
))
}
} }
pub fn get( pub fn get(
@ -300,6 +373,30 @@ impl Db {
} }
} }
pub fn merge(
&self,
column_family: &ColumnFamily,
key: &[u8],
value: &[u8],
low_priority: bool,
) -> Result<()> {
unsafe {
ffi_result!(rocksdb_transactiondb_merge_cf(
self.0.db,
if low_priority {
self.0.low_priority_write_options
} else {
self.0.write_options
},
column_family.0,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
))
}
}
pub fn insert_empty( pub fn insert_empty(
&self, &self,
column_family: &ColumnFamily, column_family: &ColumnFamily,
@ -521,3 +618,161 @@ fn convert_error(ptr: *const c_char) -> Error {
fn other_error(error: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Error { fn other_error(error: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> Error {
Error::new(ErrorKind::InvalidInput, error) Error::new(ErrorKind::InvalidInput, error)
} }
pub struct MergeOperator {
pub full: fn(&[u8], Option<&[u8]>, SlicesIterator<'_>) -> Vec<u8>,
pub partial: fn(&[u8], SlicesIterator<'_>) -> Vec<u8>,
pub name: CString,
}
unsafe extern "C" fn merge_destructor(operator: *mut c_void) {
Box::from_raw(operator as *mut MergeOperator);
}
unsafe extern "C" fn merge_full(
operator: *mut c_void,
key: *const c_char,
key_length: size_t,
existing_value: *const c_char,
existing_value_len: size_t,
operands_list: *const *const c_char,
operands_list_length: *const size_t,
num_operands: c_int,
success: *mut u8,
new_value_length: *mut size_t,
) -> *mut c_char {
let operator = &*(operator as *const MergeOperator);
let num_operands = usize::try_from(num_operands).unwrap();
let result = (operator.full)(
slice::from_raw_parts(key as *const u8, key_length),
if existing_value.is_null() {
None
} else {
Some(slice::from_raw_parts(
existing_value as *const u8,
existing_value_len,
))
},
SlicesIterator {
slices: slice::from_raw_parts(operands_list, num_operands),
lengths: slice::from_raw_parts(operands_list_length, num_operands),
cursor: 0,
},
);
*new_value_length = result.len();
*success = 1_u8;
Box::into_raw(result.into_boxed_slice()) as *mut c_char
}
pub unsafe extern "C" fn merge_partial(
operator: *mut c_void,
key: *const c_char,
key_length: size_t,
operands_list: *const *const c_char,
operands_list_length: *const size_t,
num_operands: c_int,
success: *mut u8,
new_value_length: *mut size_t,
) -> *mut c_char {
let operator = &*(operator as *const MergeOperator);
let num_operands = usize::try_from(num_operands).unwrap();
let result = (operator.partial)(
slice::from_raw_parts(key as *const u8, key_length),
SlicesIterator {
slices: slice::from_raw_parts(operands_list, num_operands),
lengths: slice::from_raw_parts(operands_list_length, num_operands),
cursor: 0,
},
);
*new_value_length = result.len();
*success = 1_u8;
Box::into_raw(result.into_boxed_slice()) as *mut c_char
}
unsafe extern "C" fn merge_delete_value(
_operator: *mut c_void,
value: *const c_char,
value_length: size_t,
) {
if !value.is_null() {
Box::from_raw(slice::from_raw_parts_mut(value as *mut u8, value_length));
}
}
unsafe extern "C" fn merge_name(operator: *mut c_void) -> *const c_char {
let operator = &*(operator as *const MergeOperator);
operator.name.as_ptr()
}
pub struct SlicesIterator<'a> {
slices: &'a [*const c_char],
lengths: &'a [size_t],
cursor: usize,
}
impl<'a> Iterator for SlicesIterator<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<Self::Item> {
if self.cursor >= self.slices.len() {
None
} else {
let slice = unsafe {
slice::from_raw_parts(
self.slices[self.cursor] as *const u8,
self.lengths[self.cursor],
)
};
self.cursor += 1;
Some(slice)
}
}
}
pub struct CompactionFilter {
pub filter: fn(&[u8], &[u8]) -> CompactionAction,
pub name: CString,
}
#[allow(dead_code)]
pub enum CompactionAction {
Keep,
Remove,
Replace(Vec<u8>),
}
unsafe extern "C" fn compactionfilter_destructor(filter: *mut c_void) {
Box::from_raw(filter as *mut CompactionFilter);
}
unsafe extern "C" fn compactionfilter_filter(
filter: *mut c_void,
_level: c_int,
key: *const c_char,
key_length: size_t,
existing_value: *const c_char,
value_length: size_t,
new_value: *mut *mut c_char,
new_value_length: *mut size_t,
value_changed: *mut c_uchar,
) -> c_uchar {
let filter = &*(filter as *const CompactionFilter);
match (filter.filter)(
slice::from_raw_parts(key as *const u8, key_length),
slice::from_raw_parts(existing_value as *const u8, value_length),
) {
CompactionAction::Keep => 0,
CompactionAction::Remove => 1,
CompactionAction::Replace(new_val) => {
*new_value_length = new_val.len();
*value_changed = 1_u8;
*new_value = Box::into_raw(new_val.into_boxed_slice()) as *mut c_char;
0
}
}
}
unsafe extern "C" fn compactionfilter_name(filter: *mut c_void) -> *const c_char {
let filter = &*(filter as *const CompactionFilter);
filter.name.as_ptr()
}

@ -20,11 +20,26 @@ rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf(
return v; return v;
} }
void rocksdb_transactiondb_flush( void rocksdb_transactiondb_flush_cf(
rocksdb_transactiondb_t* db, rocksdb_transactiondb_t* db,
const rocksdb_flushoptions_t* options, const rocksdb_flushoptions_t* options,
rocksdb_column_family_handle_t* column_family,
char** errptr) { char** errptr) {
SaveError(errptr, db->rep->Flush(options->rep)); SaveError(errptr, db->rep->Flush(options->rep, column_family->rep));
}
void rocksdb_transactiondb_compact_range_cf_opt(rocksdb_transactiondb_t* db,
rocksdb_column_family_handle_t* column_family,
rocksdb_compactoptions_t* opt,
const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len,
char** errptr) {
Slice a, b;
SaveError(errptr, db->rep->CompactRange(
opt->rep, column_family->rep,
// Pass nullptr Slice if corresponding "const char*" is nullptr
(start_key ? (a = Slice(start_key, start_key_len), &a) : nullptr),
(limit_key ? (b = Slice(limit_key, limit_key_len), &b) : nullptr)));
} }
rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy(rocksdb_writeoptions_t* options) { rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy(rocksdb_writeoptions_t* options) {

@ -11,9 +11,14 @@ extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pi
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, char** errptr); size_t keylen, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush_cf(
rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options,
rocksdb_column_family_handle_t* column_family, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_flush( extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_compact_range_cf_opt(
rocksdb_transactiondb_t* db, const rocksdb_flushoptions_t* options, char** errptr); rocksdb_transactiondb_t* db, rocksdb_column_family_handle_t* column_family,
rocksdb_compactoptions_t* opt, const char* start_key, size_t start_key_len,
const char* limit_key, size_t limit_key_len, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy( extern ROCKSDB_LIBRARY_API rocksdb_writeoptions_t* rocksdb_writeoptions_create_copy(
rocksdb_writeoptions_t*); rocksdb_writeoptions_t*);

Loading…
Cancel
Save