Merge pull request #1 from spacejam/tan_merge

Tan merge
master
Tyler Neely 10 years ago
commit 785ead1b4d
  1. 28
      src/ffi.rs
  2. 237
      src/rocksdb.rs

@ -112,26 +112,28 @@ extern {
options: RocksDBOptions, path: *const i8, err: *mut i8); options: RocksDBOptions, path: *const i8, err: *mut i8);
pub fn rocksdb_repair_db( pub fn rocksdb_repair_db(
options: RocksDBOptions, path: *const i8, err: *mut i8); options: RocksDBOptions, path: *const i8, err: *mut i8);
// Merge
// Merge Operator pub fn rocksdb_merge(db: RocksDBInstance, writeopts: RocksDBWriteOptions,
k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t, err: *mut i8);
pub fn rocksdb_mergeoperator_create( pub fn rocksdb_mergeoperator_create(
state: *mut c_void, state: *mut c_void,
destroy: extern fn(*mut c_void) -> (), destroy: extern fn(*mut c_void) -> (),
full_merge: extern fn ( full_merge: extern fn (
arg: *mut c_void, key: *const c_char, key_len: *mut size_t, arg: *mut c_void, key: *const c_char, key_len: size_t,
existing_value: *const c_char, existing_value_len: *mut size_t, existing_value: *const c_char, existing_value_len: size_t,
operands_list: &[*const c_char], operands_list_len: *const size_t, operands_list: *const *const c_char, operands_list_len: *const size_t,
num_operands: c_int, num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t success: *mut u8, new_value_length: *mut size_t
) -> *const c_char, ) -> *const c_char,
partial_merge: extern fn( partial_merge: extern fn(
*mut c_void, key: *const c_char, key_len: *mut size_t, arg: *mut c_void, key: *const c_char, key_len: size_t,
operands_list: &[*const c_char], operands_list_len: *const size_t, operands_list: *const *const c_char, operands_list_len: *const size_t,
num_operands: c_int, num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t success: *mut u8, new_value_length: *mut size_t
) -> *const c_char, ) -> *const c_char,
delete_value: extern fn(*mut c_void, value: *const c_char, delete_value: Option<extern "C" fn(*mut c_void, value: *const c_char,
value_len: *mut size_t) -> (), value_len: *mut size_t) -> ()>,
name_fn: extern fn(*mut c_void) -> *const c_char, name_fn: extern fn(*mut c_void) -> *const c_char,
) -> RocksDBMergeOperator; ) -> RocksDBMergeOperator;
pub fn rocksdb_mergeoperator_destroy(mo: RocksDBMergeOperator); pub fn rocksdb_mergeoperator_destroy(mo: RocksDBMergeOperator);
@ -152,14 +154,13 @@ fn internal() {
rocksdb_options_optimize_level_style_compaction(opts, 0); rocksdb_options_optimize_level_style_compaction(opts, 0);
rocksdb_options_set_create_if_missing(opts, 1); rocksdb_options_set_create_if_missing(opts, 1);
let rustpath = "internaltest"; let rustpath = "_rust_rocksdb_internaltest";
let cpath = rustpath.to_c_str(); let cpath = rustpath.to_c_str();
let cpath_ptr = cpath.as_ptr(); let cpath_ptr = cpath.as_ptr();
let err = 0 as *mut i8; let err = 0 as *mut i8;
let db = rocksdb_open(opts, cpath_ptr, err); let db = rocksdb_open(opts, cpath_ptr, err);
assert!(err.is_null()); assert!(err.is_null());
libc::free(err as *mut c_void);
let writeopts = rocksdb_writeoptions_create(); let writeopts = rocksdb_writeoptions_create();
let RocksDBWriteOptions(write_opt_ptr) = writeopts; let RocksDBWriteOptions(write_opt_ptr) = writeopts;
@ -170,18 +171,17 @@ fn internal() {
rocksdb_put(db, writeopts, key.as_ptr(), 4, val.as_ptr(), 8, err); rocksdb_put(db, writeopts, key.as_ptr(), 4, val.as_ptr(), 8, err);
assert!(err.is_null()); assert!(err.is_null());
libc::free(err as *mut c_void);
let readopts = rocksdb_readoptions_create(); let readopts = rocksdb_readoptions_create();
let RocksDBReadOptions(read_opts_ptr) = readopts; let RocksDBReadOptions(read_opts_ptr) = readopts;
assert!(read_opts_ptr.is_not_null()); assert!(read_opts_ptr.is_not_null());
libc::free(err as *mut c_void);
let val_len: size_t = 0; let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t; let val_len_ptr = &val_len as *const size_t;
rocksdb_get(db, readopts, key.as_ptr(), 4, val_len_ptr, err); rocksdb_get(db, readopts, key.as_ptr(), 4, val_len_ptr, err);
assert!(err.is_null()); assert!(err.is_null());
libc::free(err as *mut c_void);
rocksdb_close(db); rocksdb_close(db);
rocksdb_destroy_db(opts, cpath_ptr, err);
assert!(err.is_null());
} }
} }

@ -1,9 +1,13 @@
extern crate libc; extern crate libc;
use self::libc::{c_void, size_t}; use self::libc::{c_char, c_int, c_void, size_t};
use std::io::{IoError}; use std::io::{IoError};
use std::c_vec::CVec; use std::c_vec::CVec;
use std::c_str::CString; use std::c_str::CString;
use std::str::from_utf8; use std::str::from_utf8;
use std::string::raw::from_buf_len;
use std::ptr;
use std::mem;
use std::slice;
use rocksdb_ffi; use rocksdb_ffi;
@ -50,8 +54,20 @@ impl RocksDBOptions {
} }
} }
pub fn set_merge_operator(&self, mo: rocksdb_ffi::RocksDBMergeOperator) { pub fn add_merge_operator<'a>(&self, name: &str, merge_fn: for <'a> fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec<u8>) {
let cb = box MergeOperatorCallback {
name: name.to_c_str(),
merge_fn: merge_fn,
};
unsafe { unsafe {
let mo = rocksdb_ffi::rocksdb_mergeoperator_create(
mem::transmute(cb),
destructor_callback,
full_merge_callback,
partial_merge_callback,
None,
name_callback);
rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo); rocksdb_ffi::rocksdb_options_set_merge_operator(self.inner, mo);
} }
} }
@ -95,6 +111,29 @@ impl RocksDB {
} }
} }
pub fn destroy(opts: RocksDBOptions, path: &str) -> Result<(), String> {
unsafe {
let cpath = path.to_c_str();
let cpath_ptr = cpath.as_ptr();
//TODO test path here, as if rocksdb fails it will just crash the
// process currently
let err = 0 as *mut i8;
let result = rocksdb_ffi::rocksdb_destroy_db(opts.inner, cpath_ptr, err);
if err.is_not_null() {
let cs = CString::new(err as *const i8, true);
match cs.as_str() {
Some(error_string) =>
return Err(error_string.to_string()),
None =>
return Err("Could not initialize database.".to_string()),
}
}
Ok(())
}
}
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
@ -121,6 +160,33 @@ impl RocksDB {
} }
} }
pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
rocksdb_ffi::rocksdb_merge(self.inner, writeopts, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t, err);
if err.is_not_null() {
let cs = CString::new(err as *const i8, true);
match cs.as_str() {
Some(error_string) =>
return Err(error_string.to_string()),
None => {
let ie = IoError::last_error();
return Err(format!(
"ERROR: desc:{}, details:{}",
ie.desc,
ie.detail.unwrap_or_else(
|| {"none provided by OS".to_string()})))
}
}
}
return Ok(())
}
}
pub fn get<'a>(&self, key: &[u8]) -> pub fn get<'a>(&self, key: &[u8]) ->
RocksDBResult<'a, RocksDBVector, String> { RocksDBResult<'a, RocksDBVector, String> {
unsafe { unsafe {
@ -288,7 +354,8 @@ impl <'a,T,E> RocksDBResult<'a,T,E> {
#[allow(dead_code)] #[allow(dead_code)]
#[test] #[test]
fn external() { fn external() {
let db = RocksDB::open_default("externaltest").unwrap(); let path = "_rust_rocksdb_externaltest";
let db = RocksDB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111"); let p = db.put(b"k1", b"v1111");
assert!(p.is_ok()); assert!(p.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1"); let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
@ -296,4 +363,168 @@ fn external() {
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").is_none());
db.close(); db.close();
let opts = RocksDBOptions::new();
assert!(RocksDB::destroy(opts, path).is_ok());
}
pub struct MergeOperands<'a> {
operands_list: *const *const c_char,
operands_list_len: *const size_t,
num_operands: uint,
cursor: uint,
}
impl <'a> MergeOperands<'a> {
fn new<'a>(operands_list: *const *const c_char, operands_list_len: *const size_t,
num_operands: c_int) -> MergeOperands<'a> {
assert!(num_operands >= 0);
MergeOperands {
operands_list: operands_list,
operands_list_len: operands_list_len,
num_operands: num_operands as uint,
cursor: 0,
}
}
}
impl <'a> Iterator<&'a [u8]> for &'a mut MergeOperands<'a> {
fn next(&mut self) -> Option<&'a [u8]> {
use std::raw::Slice;
match self.cursor == self.num_operands {
true => None,
false => {
unsafe {
let base = self.operands_list as uint;
let base_len = self.operands_list_len as uint;
let spacing = mem::size_of::<*const *const u8>();
let spacing_len = mem::size_of::<*const size_t>();
let len_ptr = (base_len + (spacing_len * self.cursor)) as *const size_t;
let len = *len_ptr as uint;
let ptr = base + (spacing * self.cursor);
let op = from_buf_len(*(ptr as *const *const u8), len);
let des: Option<uint> = from_str(op.as_slice());
self.cursor += 1;
Some(mem::transmute(Slice{data:*(ptr as *const *const u8) as *const u8, len: len}))
}
}
}
}
fn size_hint(&self) -> (uint, Option<uint>) {
let remaining = self.num_operands - self.cursor;
(remaining, Some(remaining))
}
}
struct MergeOperatorCallback {
name: CString,
merge_fn: for <'b> fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec<u8>,
}
extern "C" fn destructor_callback(raw_cb: *mut c_void) {
// turn this back into a local variable so rust will reclaim it
let _: Box<MergeOperatorCallback> = unsafe {mem::transmute(raw_cb)};
}
extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char {
unsafe {
let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback);
let ptr = cb.name.as_ptr();
ptr as *const c_char
}
}
extern "C" fn full_merge_callback(
raw_cb: *mut c_void, key: *const c_char, key_len: size_t,
existing_value: *const c_char, existing_value_len: size_t,
operands_list: *const *const c_char, operands_list_len: *const size_t,
num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
unsafe {
let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = from_buf_len(key as *const u8, key_len as uint);
let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint);
let mut result = (cb.merge_fn)(key.as_bytes(), Some(oldval.as_bytes()), operands);
result.shrink_to_fit();
//TODO(tan) investigate zero-copy techniques to improve performance
let buf = libc::malloc(result.len() as size_t);
assert!(buf.is_not_null());
*new_value_length = result.len() as size_t;
*success = 1 as u8;
ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len());
buf as *const c_char
}
}
extern "C" fn partial_merge_callback(
raw_cb: *mut c_void, key: *const c_char, key_len: size_t,
operands_list: *const *const c_char, operands_list_len: *const size_t,
num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
unsafe {
let cb: &mut MergeOperatorCallback = &mut *(raw_cb as *mut MergeOperatorCallback);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = from_buf_len(key as *const u8, key_len as uint);
let mut result = (cb.merge_fn)(key.as_bytes(), None, operands);
result.shrink_to_fit();
//TODO(tan) investigate zero-copy techniques to improve performance
let buf = libc::malloc(result.len() as size_t);
assert!(buf.is_not_null());
*new_value_length = 1 as size_t;
*success = 1 as u8;
ptr::copy_memory(&mut *buf, result.as_ptr() as *const c_void, result.len());
buf as *const c_char
}
}
fn test_provided_merge<'a>(new_key: &[u8], existing_val: Option<&[u8]>,
mut operands: &mut MergeOperands) -> Vec<u8> {
let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().val0());
match existing_val {
Some(v) => result.push_all(v),
None => (),
}
for op in operands {
result.push_all(op);
}
result
}
#[allow(dead_code)]
#[test]
fn mergetest() {
let path = "_rust_rocksdb_mergetest";
unsafe {
let opts = RocksDBOptions::new();
opts.create_if_missing(true);
opts.add_merge_operator("test operator", test_provided_merge);
let db = RocksDB::open(opts, path).unwrap();
let p = db.put(b"k1", b"a");
assert!(p.is_ok());
db.merge(b"k1", b"b");
db.merge(b"k1", b"c");
db.merge(b"k1", b"d");
db.merge(b"k1", b"efg");
let m = db.merge(b"k1", b"h");
assert!(m.is_ok());
db.get(b"k1").map( |value| {
match value.to_utf8() {
Some(v) =>
println!("retrieved utf8 value: {}", v),
None =>
println!("did not read valid utf-8 out of the db"),
}
}).on_absent( || { println!("value not present!") })
.on_error( |e| { println!("error reading value")}); //: {}", e) });
assert!(m.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none());
db.close();
assert!(RocksDB::destroy(opts, path).is_ok());
}
} }

Loading…
Cancel
Save