|
|
@ -13,6 +13,8 @@ |
|
|
|
See the License for the specific language governing permissions and |
|
|
|
See the License for the specific language governing permissions and |
|
|
|
limitations under the License. |
|
|
|
limitations under the License. |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
|
|
|
|
#![allow(unstable)] |
|
|
|
|
|
|
|
|
|
|
|
extern crate libc; |
|
|
|
extern crate libc; |
|
|
|
use self::libc::{c_char, c_int, c_void, size_t}; |
|
|
|
use self::libc::{c_char, c_int, c_void, size_t}; |
|
|
|
use std::io::{IoError}; |
|
|
|
use std::io::{IoError}; |
|
|
@ -297,7 +299,7 @@ impl RocksDB { |
|
|
|
let err = 0 as *mut i8; |
|
|
|
let err = 0 as *mut i8; |
|
|
|
let db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err); |
|
|
|
let db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err); |
|
|
|
let rocksdb_ffi::RocksDBInstance(db_ptr) = db; |
|
|
|
let rocksdb_ffi::RocksDBInstance(db_ptr) = db; |
|
|
|
if err != 0 as *mut i8 { |
|
|
|
if !err.is_null() { |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
return Err(cs); |
|
|
|
return Err(cs); |
|
|
|
} |
|
|
|
} |
|
|
@ -319,7 +321,7 @@ impl RocksDB { |
|
|
|
let err = 0 as *mut i8; |
|
|
|
let err = 0 as *mut i8; |
|
|
|
let result = rocksdb_ffi::rocksdb_destroy_db( |
|
|
|
let result = rocksdb_ffi::rocksdb_destroy_db( |
|
|
|
opts.inner, cpath_ptr, err); |
|
|
|
opts.inner, cpath_ptr, err); |
|
|
|
if err != 0 as *mut i8 { |
|
|
|
if !err.is_null() { |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
return Err(cs); |
|
|
|
return Err(cs); |
|
|
|
} |
|
|
|
} |
|
|
@ -334,7 +336,7 @@ impl RocksDB { |
|
|
|
rocksdb_ffi::rocksdb_put(self.inner, writeopts, key.as_ptr(), |
|
|
|
rocksdb_ffi::rocksdb_put(self.inner, writeopts, key.as_ptr(), |
|
|
|
key.len() as size_t, value.as_ptr(), |
|
|
|
key.len() as size_t, value.as_ptr(), |
|
|
|
value.len() as size_t, err); |
|
|
|
value.len() as size_t, err); |
|
|
|
if err != 0 as *mut i8 { |
|
|
|
if !err.is_null() { |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
return Err(cs); |
|
|
|
return Err(cs); |
|
|
|
} |
|
|
|
} |
|
|
@ -349,7 +351,7 @@ impl RocksDB { |
|
|
|
rocksdb_ffi::rocksdb_merge(self.inner, writeopts, key.as_ptr(), |
|
|
|
rocksdb_ffi::rocksdb_merge(self.inner, writeopts, key.as_ptr(), |
|
|
|
key.len() as size_t, value.as_ptr(), |
|
|
|
key.len() as size_t, value.as_ptr(), |
|
|
|
value.len() as size_t, err); |
|
|
|
value.len() as size_t, err); |
|
|
|
if err != 0 as *mut i8 { |
|
|
|
if !err.is_null() { |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
return Err(cs); |
|
|
|
return Err(cs); |
|
|
|
} |
|
|
|
} |
|
|
@ -375,7 +377,7 @@ impl RocksDB { |
|
|
|
let err = 0 as *mut i8; |
|
|
|
let err = 0 as *mut i8; |
|
|
|
let val = rocksdb_ffi::rocksdb_get(self.inner, readopts, |
|
|
|
let val = rocksdb_ffi::rocksdb_get(self.inner, readopts, |
|
|
|
key.as_ptr(), key.len() as size_t, val_len_ptr, err) as *mut u8; |
|
|
|
key.as_ptr(), key.len() as size_t, val_len_ptr, err) as *mut u8; |
|
|
|
if err != 0 as *mut i8 { |
|
|
|
if !err.is_null() { |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
return RocksDBResult::Error(cs); |
|
|
|
return RocksDBResult::Error(cs); |
|
|
|
} |
|
|
|
} |
|
|
@ -394,7 +396,7 @@ impl RocksDB { |
|
|
|
let err = 0 as *mut i8; |
|
|
|
let err = 0 as *mut i8; |
|
|
|
rocksdb_ffi::rocksdb_delete(self.inner, writeopts, key.as_ptr(), |
|
|
|
rocksdb_ffi::rocksdb_delete(self.inner, writeopts, key.as_ptr(), |
|
|
|
key.len() as size_t, err); |
|
|
|
key.len() as size_t, err); |
|
|
|
if err != 0 as *mut i8 { |
|
|
|
if !err.is_null() { |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
let cs = from_c_str(err as *const i8); |
|
|
|
return Err(cs); |
|
|
|
return Err(cs); |
|
|
|
} |
|
|
|
} |
|
|
@ -412,6 +414,13 @@ pub struct RocksDBVector { |
|
|
|
len: usize, |
|
|
|
len: usize, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
impl Deref for RocksDBVector { |
|
|
|
|
|
|
|
type Target = [u8]; |
|
|
|
|
|
|
|
fn deref(&self) -> &[u8] { |
|
|
|
|
|
|
|
unsafe { slice::from_raw_mut_buf(&self.base.0, self.len) } |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl Drop for RocksDBVector { |
|
|
|
impl Drop for RocksDBVector { |
|
|
|
fn drop(&mut self) { |
|
|
|
fn drop(&mut self) { |
|
|
|
unsafe { |
|
|
|
unsafe { |
|
|
@ -431,14 +440,8 @@ impl RocksDBVector { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub fn as_slice<'a>(&'a self) -> &'a [u8] { |
|
|
|
pub fn to_utf8<'a>(&'a self) -> Option<&'a str> { |
|
|
|
unsafe { |
|
|
|
from_utf8(self.deref()).ok() |
|
|
|
slice::from_raw_buf(self.base.0 as *const u8, self.len) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pub fn to_utf8(&self) -> Option<&str> { |
|
|
|
|
|
|
|
from_utf8(self.as_slice()).ok() |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
@ -452,7 +455,7 @@ pub enum RocksDBResult<'a,T,E> { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
impl <'a,T,E> RocksDBResult<'a,T,E> { |
|
|
|
impl <'a,T,E> RocksDBResult<'a,T,E> { |
|
|
|
pub fn map<U>(self, f: &mut FnOnce(T) -> U) -> RocksDBResult<'a,U,E> { |
|
|
|
pub fn map<U, F: FnOnce(T) -> U>(self, f: F) -> RocksDBResult<'a,U,E> { |
|
|
|
match self { |
|
|
|
match self { |
|
|
|
RocksDBResult::Some(x) => RocksDBResult::Some(f(x)), |
|
|
|
RocksDBResult::Some(x) => RocksDBResult::Some(f(x)), |
|
|
|
RocksDBResult::None => RocksDBResult::None, |
|
|
|
RocksDBResult::None => RocksDBResult::None, |
|
|
@ -470,7 +473,7 @@ impl <'a,T,E> RocksDBResult<'a,T,E> { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub fn on_error<U>(self, f: FnOnce(E) -> U) -> RocksDBResult<'a,T,U> { |
|
|
|
pub fn on_error<U, F: FnOnce(E) -> U>(self, f: F) -> RocksDBResult<'a,T,U> { |
|
|
|
match self { |
|
|
|
match self { |
|
|
|
RocksDBResult::Some(x) => RocksDBResult::Some(x), |
|
|
|
RocksDBResult::Some(x) => RocksDBResult::Some(x), |
|
|
|
RocksDBResult::None => RocksDBResult::None, |
|
|
|
RocksDBResult::None => RocksDBResult::None, |
|
|
@ -478,7 +481,7 @@ impl <'a,T,E> RocksDBResult<'a,T,E> { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
pub fn on_absent(self, f: FnOnce() -> ()) -> RocksDBResult<'a,T,E> { |
|
|
|
pub fn on_absent<F: FnOnce()->()>(self, f: F) -> RocksDBResult<'a,T,E> { |
|
|
|
match self { |
|
|
|
match self { |
|
|
|
RocksDBResult::Some(x) => RocksDBResult::Some(x), |
|
|
|
RocksDBResult::Some(x) => RocksDBResult::Some(x), |
|
|
|
RocksDBResult::None => { |
|
|
|
RocksDBResult::None => { |
|
|
@ -519,7 +522,7 @@ fn external() { |
|
|
|
let db = RocksDB::open_default(path).unwrap(); |
|
|
|
let db = RocksDB::open_default(path).unwrap(); |
|
|
|
let p = db.put(b"k1", b"v1111"); |
|
|
|
let p = db.put(b"k1", b"v1111"); |
|
|
|
assert!(p.is_ok()); |
|
|
|
assert!(p.is_ok()); |
|
|
|
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1"); |
|
|
|
let r: RocksDBResult<RocksDBVector, &str> = db.get(b"k1"); |
|
|
|
assert!(r.unwrap().to_utf8().unwrap() == "v1111"); |
|
|
|
assert!(r.unwrap().to_utf8().unwrap() == "v1111"); |
|
|
|
assert!(db.delete(b"k1").is_ok()); |
|
|
|
assert!(db.delete(b"k1").is_ok()); |
|
|
|
assert!(db.get(b"k1").is_none()); |
|
|
|
assert!(db.get(b"k1").is_none()); |
|
|
@ -565,8 +568,6 @@ impl<'a> Iterator for &'a mut MergeOperands<'a> { |
|
|
|
as *const size_t; |
|
|
|
as *const size_t; |
|
|
|
let len = *len_ptr as usize; |
|
|
|
let len = *len_ptr as usize; |
|
|
|
let ptr = base + (spacing * self.cursor); |
|
|
|
let ptr = base + (spacing * self.cursor); |
|
|
|
let op = String::from_utf8(slice::from_raw_buf(*(ptr as *const *const u8), len)); |
|
|
|
|
|
|
|
let des: Option<usize> = op.to_utf8(); |
|
|
|
|
|
|
|
self.cursor += 1; |
|
|
|
self.cursor += 1; |
|
|
|
Some(mem::transmute(Slice{data:*(ptr as *const *const u8) |
|
|
|
Some(mem::transmute(Slice{data:*(ptr as *const *const u8) |
|
|
|
as *const u8, len: len})) |
|
|
|
as *const u8, len: len})) |
|
|
@ -602,7 +603,7 @@ extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
extern "C" fn full_merge_callback( |
|
|
|
extern "C" fn full_merge_callback( |
|
|
|
raw_cb: *mut c_void, key: *const c_char, key_len: size_t, |
|
|
|
raw_cb: *mut c_void, raw_key: *const c_char, key_len: size_t, |
|
|
|
existing_value: *const c_char, existing_value_len: size_t, |
|
|
|
existing_value: *const c_char, existing_value_len: size_t, |
|
|
|
operands_list: *const *const c_char, operands_list_len: *const size_t, |
|
|
|
operands_list: *const *const c_char, operands_list_len: *const size_t, |
|
|
|
num_operands: c_int, |
|
|
|
num_operands: c_int, |
|
|
@ -614,20 +615,15 @@ extern "C" fn full_merge_callback( |
|
|
|
&mut MergeOperands::new(operands_list, |
|
|
|
&mut MergeOperands::new(operands_list, |
|
|
|
operands_list_len, |
|
|
|
operands_list_len, |
|
|
|
num_operands); |
|
|
|
num_operands); |
|
|
|
let key = String::from_utf8(slice::from_raw_buf(key as *const u8, key_len as usize)); |
|
|
|
let key: &[u8] = mem::transmute(slice::from_raw_buf(&raw_key, key_len as usize)); |
|
|
|
let oldval = String::from_utf8(slice::from_raw_buf(existing_value as *const u8, |
|
|
|
let oldval: &[u8] = mem::transmute(slice::from_raw_buf(&existing_value, |
|
|
|
existing_value_len as usize)); |
|
|
|
existing_value_len as usize)); |
|
|
|
let mut result = |
|
|
|
let mut result = |
|
|
|
(cb.merge_fn)(key.as_bytes(), Some(oldval.as_bytes()), operands); |
|
|
|
(cb.merge_fn)(key, Some(oldval), operands); |
|
|
|
result.shrink_to_fit(); |
|
|
|
result.shrink_to_fit(); |
|
|
|
/* |
|
|
|
|
|
|
|
let ptr = result.as_ptr(); |
|
|
|
|
|
|
|
mem::forget(result); |
|
|
|
|
|
|
|
ptr as *const c_char |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
//TODO(tan) investigate zero-copy techniques to improve performance
|
|
|
|
//TODO(tan) investigate zero-copy techniques to improve performance
|
|
|
|
let buf = libc::malloc(result.len() as size_t); |
|
|
|
let buf = libc::malloc(result.len() as size_t); |
|
|
|
assert!(buf.is_not_null()); |
|
|
|
assert!(!buf.is_null()); |
|
|
|
*new_value_length = result.len() as size_t; |
|
|
|
*new_value_length = result.len() as size_t; |
|
|
|
*success = 1 as u8; |
|
|
|
*success = 1 as u8; |
|
|
|
ptr::copy_memory(&mut *buf, result.as_ptr() |
|
|
|
ptr::copy_memory(&mut *buf, result.as_ptr() |
|
|
@ -637,7 +633,7 @@ extern "C" fn full_merge_callback( |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
extern "C" fn partial_merge_callback( |
|
|
|
extern "C" fn partial_merge_callback( |
|
|
|
raw_cb: *mut c_void, key: *const c_char, key_len: size_t, |
|
|
|
raw_cb: *mut c_void, raw_key: *const c_char, key_len: size_t, |
|
|
|
operands_list: *const *const c_char, operands_list_len: *const size_t, |
|
|
|
operands_list: *const *const c_char, operands_list_len: *const size_t, |
|
|
|
num_operands: c_int, |
|
|
|
num_operands: c_int, |
|
|
|
success: *mut u8, new_value_length: *mut size_t) -> *const c_char { |
|
|
|
success: *mut u8, new_value_length: *mut size_t) -> *const c_char { |
|
|
@ -647,12 +643,12 @@ extern "C" fn partial_merge_callback( |
|
|
|
let operands = &mut MergeOperands::new(operands_list, |
|
|
|
let operands = &mut MergeOperands::new(operands_list, |
|
|
|
operands_list_len, |
|
|
|
operands_list_len, |
|
|
|
num_operands); |
|
|
|
num_operands); |
|
|
|
let key = String::from_utf8(slice::from_raw_buf(key as *const u8, key_len as usize)); |
|
|
|
let key: &[u8] = mem::transmute(slice::from_raw_buf(&raw_key, key_len as usize)); |
|
|
|
let mut result = (cb.merge_fn)(key.as_bytes(), None, operands); |
|
|
|
let mut result = (cb.merge_fn)(key, None, operands); |
|
|
|
result.shrink_to_fit(); |
|
|
|
result.shrink_to_fit(); |
|
|
|
//TODO(tan) investigate zero-copy techniques to improve performance
|
|
|
|
//TODO(tan) investigate zero-copy techniques to improve performance
|
|
|
|
let buf = libc::malloc(result.len() as size_t); |
|
|
|
let buf = libc::malloc(result.len() as size_t); |
|
|
|
assert!(buf.is_not_null()); |
|
|
|
assert!(!buf.is_null()); |
|
|
|
*new_value_length = 1 as size_t; |
|
|
|
*new_value_length = 1 as size_t; |
|
|
|
*success = 1 as u8; |
|
|
|
*success = 1 as u8; |
|
|
|
ptr::copy_memory(&mut *buf, result.as_ptr() |
|
|
|
ptr::copy_memory(&mut *buf, result.as_ptr() |
|
|
@ -663,7 +659,7 @@ extern "C" fn partial_merge_callback( |
|
|
|
|
|
|
|
|
|
|
|
fn test_provided_merge(new_key: &[u8], existing_val: Option<&[u8]>, |
|
|
|
fn test_provided_merge(new_key: &[u8], existing_val: Option<&[u8]>, |
|
|
|
mut operands: &mut MergeOperands) -> Vec<u8> { |
|
|
|
mut operands: &mut MergeOperands) -> Vec<u8> { |
|
|
|
let nops = operands.size_hint().0(); |
|
|
|
let nops = operands.size_hint().0; |
|
|
|
let mut result: Vec<u8> = Vec::with_capacity(nops); |
|
|
|
let mut result: Vec<u8> = Vec::with_capacity(nops); |
|
|
|
match existing_val { |
|
|
|
match existing_val { |
|
|
|
Some(v) => result.push_all(v), |
|
|
|
Some(v) => result.push_all(v), |
|
|
@ -679,35 +675,33 @@ fn test_provided_merge(new_key: &[u8], existing_val: Option<&[u8]>, |
|
|
|
#[test] |
|
|
|
#[test] |
|
|
|
fn mergetest() { |
|
|
|
fn mergetest() { |
|
|
|
let path = "_rust_rocksdb_mergetest"; |
|
|
|
let path = "_rust_rocksdb_mergetest"; |
|
|
|
unsafe { |
|
|
|
let opts = RocksDBOptions::new(); |
|
|
|
let opts = RocksDBOptions::new(); |
|
|
|
opts.create_if_missing(true); |
|
|
|
opts.create_if_missing(true); |
|
|
|
opts.add_merge_operator("test operator", test_provided_merge); |
|
|
|
opts.add_merge_operator("test operator", test_provided_merge); |
|
|
|
let db = RocksDB::open(opts, path).unwrap(); |
|
|
|
let db = RocksDB::open(opts, path).unwrap(); |
|
|
|
let p = db.put(b"k1", b"a"); |
|
|
|
let p = db.put(b"k1", b"a"); |
|
|
|
assert!(p.is_ok()); |
|
|
|
assert!(p.is_ok()); |
|
|
|
db.merge(b"k1", b"b"); |
|
|
|
db.merge(b"k1", b"b"); |
|
|
|
db.merge(b"k1", b"c"); |
|
|
|
db.merge(b"k1", b"c"); |
|
|
|
db.merge(b"k1", b"d"); |
|
|
|
db.merge(b"k1", b"d"); |
|
|
|
db.merge(b"k1", b"efg"); |
|
|
|
db.merge(b"k1", b"efg"); |
|
|
|
let m = db.merge(b"k1", b"h"); |
|
|
|
let m = db.merge(b"k1", b"h"); |
|
|
|
assert!(m.is_ok()); |
|
|
|
assert!(m.is_ok()); |
|
|
|
db.get(b"k1").map( |value| { |
|
|
|
db.get(b"k1").map( |value| { |
|
|
|
match value.to_utf8() { |
|
|
|
match value.to_utf8() { |
|
|
|
Some(v) => |
|
|
|
Some(v) => |
|
|
|
println!("retrieved utf8 value: {}", v), |
|
|
|
println!("retrieved utf8 value: {}", v), |
|
|
|
None => |
|
|
|
None => |
|
|
|
println!("did not read valid utf-8 out of the db"), |
|
|
|
println!("did not read valid utf-8 out of the db"), |
|
|
|
} |
|
|
|
} |
|
|
|
}).on_absent( || { println!("value not present!") }) |
|
|
|
}).on_absent( || { println!("value not present!") }) |
|
|
|
.on_error( |e| { println!("error reading value")}); //: {", e) });
|
|
|
|
.on_error( |e| { println!("error reading value")}); //: {", e) });
|
|
|
|
|
|
|
|
|
|
|
|
assert!(m.is_ok()); |
|
|
|
assert!(m.is_ok()); |
|
|
|
let r: RocksDBResult<RocksDBVector, &str> = db.get(b"k1"); |
|
|
|
let r: RocksDBResult<RocksDBVector, String> = db.get(b"k1"); |
|
|
|
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); |
|
|
|
assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); |
|
|
|
assert!(db.delete(b"k1").is_ok()); |
|
|
|
assert!(db.delete(b"k1").is_ok()); |
|
|
|
assert!(db.get(b"k1").is_none()); |
|
|
|
assert!(db.get(b"k1").is_none()); |
|
|
|
db.close(); |
|
|
|
db.close(); |
|
|
|
assert!(RocksDB::destroy(opts, path).is_ok()); |
|
|
|
assert!(RocksDB::destroy(opts, path).is_ok()); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|