initial implementation fragments for supporting merge operators

master
Tyler Neely 10 years ago
parent e585d23e27
commit 90ef2333c3
  1. 19
      src/ffi.rs
  2. 97
      src/rocksdb.rs

@ -113,25 +113,28 @@ extern {
pub fn rocksdb_repair_db( pub fn rocksdb_repair_db(
options: RocksDBOptions, path: *const i8, err: *mut i8); options: RocksDBOptions, path: *const i8, err: *mut i8);
// Merge Operator // Merge
pub fn rocksdb_merge(db: RocksDBInstance, writeopts: RocksDBWriteOptions,
k: *const u8, kLen: size_t,
v: *const u8, vLen: size_t, err: *mut i8);
pub fn rocksdb_mergeoperator_create( pub fn rocksdb_mergeoperator_create(
state: *mut c_void, state: *mut c_void,
destroy: extern fn(*mut c_void) -> (), destroy: extern fn(*mut c_void) -> (),
full_merge: extern fn ( full_merge: extern fn (
arg: *mut c_void, key: *const c_char, key_len: *mut size_t, arg: *mut c_void, key: *const c_char, key_len: *mut size_t,
existing_value: *const c_char, existing_value_len: *mut size_t, existing_value: *const c_char, existing_value_len: *mut size_t,
operands_list: &[*const c_char], operands_list_len: *const size_t, operands_list: &[*const c_char], operands_list_len: &[size_t],
num_operands: c_int, num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t success: *mut u8, new_value_length: *mut size_t
) -> *const c_char, ) -> *const c_char,
partial_merge: extern fn( partial_merge: extern fn(
*mut c_void, key: *const c_char, key_len: *mut size_t, arg: *mut c_void, key: *const c_char, key_len: *mut size_t,
operands_list: &[*const c_char], operands_list_len: *const size_t, operands_list: &[*const c_char], operands_list_len: &[size_t],
num_operands: c_int, num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t success: *mut u8, new_value_length: *mut size_t
) -> *const c_char, ) -> *const c_char,
delete_value: extern fn(*mut c_void, value: *const c_char, delete_value: Option<extern "C" fn(*mut c_void, value: *const c_char,
value_len: *mut size_t) -> (), value_len: *mut size_t) -> ()>,
name_fn: extern fn(*mut c_void) -> *const c_char, name_fn: extern fn(*mut c_void) -> *const c_char,
) -> RocksDBMergeOperator; ) -> RocksDBMergeOperator;
pub fn rocksdb_mergeoperator_destroy(mo: RocksDBMergeOperator); pub fn rocksdb_mergeoperator_destroy(mo: RocksDBMergeOperator);
@ -159,7 +162,6 @@ fn internal() {
let err = 0 as *mut i8; let err = 0 as *mut i8;
let db = rocksdb_open(opts, cpath_ptr, err); let db = rocksdb_open(opts, cpath_ptr, err);
assert!(err.is_null()); assert!(err.is_null());
libc::free(err as *mut c_void);
let writeopts = rocksdb_writeoptions_create(); let writeopts = rocksdb_writeoptions_create();
let RocksDBWriteOptions(write_opt_ptr) = writeopts; let RocksDBWriteOptions(write_opt_ptr) = writeopts;
@ -170,18 +172,15 @@ fn internal() {
rocksdb_put(db, writeopts, key.as_ptr(), 4, val.as_ptr(), 8, err); rocksdb_put(db, writeopts, key.as_ptr(), 4, val.as_ptr(), 8, err);
assert!(err.is_null()); assert!(err.is_null());
libc::free(err as *mut c_void);
let readopts = rocksdb_readoptions_create(); let readopts = rocksdb_readoptions_create();
let RocksDBReadOptions(read_opts_ptr) = readopts; let RocksDBReadOptions(read_opts_ptr) = readopts;
assert!(read_opts_ptr.is_not_null()); assert!(read_opts_ptr.is_not_null());
libc::free(err as *mut c_void);
let val_len: size_t = 0; let val_len: size_t = 0;
let val_len_ptr = &val_len as *const size_t; let val_len_ptr = &val_len as *const size_t;
rocksdb_get(db, readopts, key.as_ptr(), 4, val_len_ptr, err); rocksdb_get(db, readopts, key.as_ptr(), 4, val_len_ptr, err);
assert!(err.is_null()); assert!(err.is_null());
libc::free(err as *mut c_void);
rocksdb_close(db); rocksdb_close(db);
} }
} }

@ -1,9 +1,10 @@
extern crate libc; extern crate libc;
use self::libc::{c_void, size_t}; use self::libc::{c_char, c_int, c_void, size_t};
use std::io::{IoError}; use std::io::{IoError};
use std::c_vec::CVec; use std::c_vec::CVec;
use std::c_str::CString; use std::c_str::CString;
use std::str::from_utf8; use std::str::from_utf8;
use std::string::raw::from_buf_len;
use rocksdb_ffi; use rocksdb_ffi;
@ -121,6 +122,33 @@ impl RocksDB {
} }
} }
pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> {
unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8;
rocksdb_ffi::rocksdb_merge(self.inner, writeopts, key.as_ptr(),
key.len() as size_t, value.as_ptr(),
value.len() as size_t, err);
if err.is_not_null() {
let cs = CString::new(err as *const i8, true);
match cs.as_str() {
Some(error_string) =>
return Err(error_string.to_string()),
None => {
let ie = IoError::last_error();
return Err(format!(
"ERROR: desc:{}, details:{}",
ie.desc,
ie.detail.unwrap_or_else(
|| {"none provided by OS".to_string()})))
}
}
}
return Ok(())
}
}
pub fn get<'a>(&self, key: &[u8]) -> pub fn get<'a>(&self, key: &[u8]) ->
RocksDBResult<'a, RocksDBVector, String> { RocksDBResult<'a, RocksDBVector, String> {
unsafe { unsafe {
@ -297,3 +325,70 @@ fn external() {
assert!(db.get(b"k1").is_none()); assert!(db.get(b"k1").is_none());
db.close(); db.close();
} }
extern "C" fn null_destructor(args: *mut c_void) {
println!("in null_destructor now");
}
extern "C" fn mo_name(args: *mut c_void) -> *const c_char {
"test_mo".to_c_str().as_ptr()
}
extern "C" fn full_merge(
arg: *mut c_void, key: *const c_char, key_len: *mut size_t,
existing_value: *const c_char, existing_value_len: *mut size_t,
operands_list: &[*const c_char], operands_list_len: &[size_t],
num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
unsafe {
println!("in the FULL merge operator right now!");
//println!("first opt len: {}", operands_list_len[0]);
let oldkey = from_buf_len(key as *const u8, key_len as uint);
let oldval = from_buf_len(existing_value as *const u8, existing_value_len as uint);
*new_value_length = 1;
*success = 1 as u8;
}
"2".to_c_str().as_ptr()
}
extern "C" fn partial_merge(
arg: *mut c_void, key: *const c_char, key_len: *mut size_t,
operands_list: &[*const c_char], operands_list_len: &[size_t],
num_operands: c_int,
success: *mut u8, new_value_length: *mut size_t) -> *const c_char {
unsafe {
println!("in the PARTIAL merge operator right now!");
*new_value_length = 2;
*success = 1 as u8;
}
"3".to_c_str().as_ptr()
}
#[allow(dead_code)]
#[zest]
fn mergetest() {
unsafe {
let opts = RocksDBOptions::new();
let mo = rocksdb_ffi::rocksdb_mergeoperator_create(
0 as *mut c_void,
null_destructor,
full_merge,
partial_merge,
None,
mo_name);
opts.create_if_missing(true);
opts.set_merge_operator(mo);
let db = RocksDB::open(opts, "externaltest").unwrap();
let p = db.put(b"k1", b"1");
//assert!(p.is_ok());
let m = db.merge(b"k1", b"1");
println!("after merge");
println!("k1's value is: {}", db.get(b"k1").unwrap().to_utf8().unwrap());
/*
assert!(m.is_ok());
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1");
assert!(r.unwrap().to_utf8().unwrap() == "2");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").is_none());
*/
db.close();
}
}

Loading…
Cancel
Save