ICE for 1/8/15 nightly

master
Tyler Neely 10 years ago
parent bd910578f2
commit 1e38a2433d
  1. 184
      src/rocksdb.rs

@ -16,12 +16,14 @@
extern crate libc; extern crate libc;
use self::libc::{c_char, c_int, c_void, size_t}; use self::libc::{c_char, c_int, c_void, size_t};
use std::io::{IoError}; use std::io::{IoError};
use std::c_vec::CVec; use std::ops::Deref;
use std::c_str::CString; use std::ptr::Unique;
use std::ffi::CString;
use std::str::from_utf8; use std::str::from_utf8;
use std::ptr; use std::ptr;
use std::mem; use std::mem;
use std::ptr::Unique; use std::slice;
use std::str::from_c_str;
use rocksdb_ffi; use rocksdb_ffi;
@ -73,10 +75,10 @@ impl RocksDBOptions {
pub fn add_merge_operator<'a>( &self, name: &str, pub fn add_merge_operator<'a>( &self, name: &str,
merge_fn: fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec<u8>) { merge_fn: fn (&[u8], Option<&[u8]>, &mut MergeOperands) -> Vec<u8>) {
let cb = box MergeOperatorCallback { let cb = Box::new(MergeOperatorCallback {
name: name.to_c_str(), name: CString::from_slice(name.as_bytes()),
merge_fn: merge_fn, merge_fn: merge_fn,
}; });
unsafe { unsafe {
let mo = rocksdb_ffi::rocksdb_mergeoperator_create( let mo = rocksdb_ffi::rocksdb_mergeoperator_create(
@ -278,15 +280,15 @@ pub struct RocksDB {
} }
impl RocksDB { impl RocksDB {
pub fn open_default(path: &str) -> Result<RocksDB, String> { pub fn open_default(path: &str) -> Result<RocksDB, &str> {
let opts = RocksDBOptions::new(); let opts = RocksDBOptions::new();
opts.create_if_missing(true); opts.create_if_missing(true);
RocksDB::open(opts, path) RocksDB::open(opts, path)
} }
pub fn open(opts: RocksDBOptions, path: &str) -> Result<RocksDB, String> { pub fn open(opts: RocksDBOptions, path: &str) -> Result<RocksDB, &str> {
unsafe { unsafe {
let cpath = path.to_c_str(); let cpath = CString::from_slice(path.as_bytes());
let cpath_ptr = cpath.as_ptr(); let cpath_ptr = cpath.as_ptr();
//TODO test path here, as if rocksdb fails it will just crash the //TODO test path here, as if rocksdb fails it will just crash the
@ -295,26 +297,20 @@ impl RocksDB {
let err = 0 as *mut i8; let err = 0 as *mut i8;
let db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err); let db = rocksdb_ffi::rocksdb_open(opts.inner, cpath_ptr, err);
let rocksdb_ffi::RocksDBInstance(db_ptr) = db; let rocksdb_ffi::RocksDBInstance(db_ptr) = db;
if err.is_not_null() { if err != 0 as *mut i8 {
let cs = CString::new(err as *const i8, true); let cs = from_c_str(err as *const i8);
match cs.as_str() { return Err(cs);
Some(error_string) =>
return Err(error_string.to_string()),
None =>
return Err(
"Could not initialize database.".to_string()),
}
} }
if db_ptr.is_null() { if db_ptr.is_null() {
return Err("Could not initialize database.".to_string()); return Err("Could not initialize database.");
} }
Ok(RocksDB{inner: db}) Ok(RocksDB{inner: db})
} }
} }
pub fn destroy(opts: RocksDBOptions, path: &str) -> Result<(), String> { pub fn destroy(opts: RocksDBOptions, path: &str) -> Result<(), &str> {
unsafe { unsafe {
let cpath = path.to_c_str(); let cpath = CString::from_slice(path.as_bytes());
let cpath_ptr = cpath.as_ptr(); let cpath_ptr = cpath.as_ptr();
//TODO test path here, as if rocksdb fails it will just crash the //TODO test path here, as if rocksdb fails it will just crash the
@ -323,67 +319,39 @@ impl RocksDB {
let err = 0 as *mut i8; let err = 0 as *mut i8;
let result = rocksdb_ffi::rocksdb_destroy_db( let result = rocksdb_ffi::rocksdb_destroy_db(
opts.inner, cpath_ptr, err); opts.inner, cpath_ptr, err);
if err.is_not_null() { if err != 0 as *mut i8 {
let cs = CString::new(err as *const i8, true); let cs = from_c_str(err as *const i8);
match cs.as_str() { return Err(cs);
Some(error_string) =>
return Err(error_string.to_string()),
None =>
return Err(
"Could not initialize database.".to_string()),
}
} }
Ok(()) Ok(())
} }
} }
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), String> { pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), &str> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8; let err = 0 as *mut i8;
rocksdb_ffi::rocksdb_put(self.inner, writeopts, key.as_ptr(), rocksdb_ffi::rocksdb_put(self.inner, writeopts, key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
value.len() as size_t, err); value.len() as size_t, err);
if err.is_not_null() { if err != 0 as *mut i8 {
let cs = CString::new(err as *const i8, true); let cs = from_c_str(err as *const i8);
match cs.as_str() { return Err(cs);
Some(error_string) =>
return Err(error_string.to_string()),
None => {
let ie = IoError::last_error();
return Err(format!(
"ERROR: desc:{}, details:{}",
ie.desc,
ie.detail.unwrap_or_else(
|| {"none provided by OS".to_string()})))
}
}
} }
return Ok(()) return Ok(())
} }
} }
pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), String> { pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), &str> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8; let err = 0 as *mut i8;
rocksdb_ffi::rocksdb_merge(self.inner, writeopts, key.as_ptr(), rocksdb_ffi::rocksdb_merge(self.inner, writeopts, key.as_ptr(),
key.len() as size_t, value.as_ptr(), key.len() as size_t, value.as_ptr(),
value.len() as size_t, err); value.len() as size_t, err);
if err.is_not_null() { if err != 0 as *mut i8 {
let cs = CString::new(err as *const i8, true); let cs = from_c_str(err as *const i8);
match cs.as_str() { return Err(cs);
Some(error_string) =>
return Err(error_string.to_string()),
None => {
let ie = IoError::last_error();
return Err(format!(
"ERROR: desc:{}, details:{}",
ie.desc,
ie.detail.unwrap_or_else(
|| {"none provided by OS".to_string()})))
}
}
} }
return Ok(()) return Ok(())
} }
@ -391,7 +359,7 @@ impl RocksDB {
pub fn get<'a>(&self, key: &[u8]) -> pub fn get<'a>(&self, key: &[u8]) ->
RocksDBResult<'a, RocksDBVector, String> { RocksDBResult<'a, RocksDBVector, &str> {
unsafe { unsafe {
let readopts = rocksdb_ffi::rocksdb_readoptions_create(); let readopts = rocksdb_ffi::rocksdb_readoptions_create();
let rocksdb_ffi::RocksDBReadOptions(read_opts_ptr) = readopts; let rocksdb_ffi::RocksDBReadOptions(read_opts_ptr) = readopts;
@ -399,7 +367,7 @@ impl RocksDB {
return RocksDBResult::Error("Unable to create rocksdb read \ return RocksDBResult::Error("Unable to create rocksdb read \
options. This is a fairly trivial call, and its failure \ options. This is a fairly trivial call, and its failure \
may be indicative of a mis-compiled or mis-loaded rocksdb \ may be indicative of a mis-compiled or mis-loaded rocksdb \
library.".to_string()); library.");
} }
let val_len: size_t = 0; let val_len: size_t = 0;
@ -407,16 +375,9 @@ impl RocksDB {
let err = 0 as *mut i8; let err = 0 as *mut i8;
let val = rocksdb_ffi::rocksdb_get(self.inner, readopts, let val = rocksdb_ffi::rocksdb_get(self.inner, readopts,
key.as_ptr(), key.len() as size_t, val_len_ptr, err) as *mut u8; key.as_ptr(), key.len() as size_t, val_len_ptr, err) as *mut u8;
if err.is_not_null() { if err != 0 as *mut i8 {
let cs = CString::new(err as *const i8, true); let cs = from_c_str(err as *const i8);
match cs.as_str() { return RocksDBResult::Error(cs);
Some(error_string) =>
return RocksDBResult::Error(error_string.to_string()),
None =>
return RocksDBResult::Error("Unable to get value from \
rocksdb. (non-utf8 error received from underlying \
library)".to_string()),
}
} }
match val.is_null() { match val.is_null() {
true => RocksDBResult::None, true => RocksDBResult::None,
@ -427,26 +388,15 @@ impl RocksDB {
} }
} }
pub fn delete(&self, key: &[u8]) -> Result<(),String> { pub fn delete(&self, key: &[u8]) -> Result<(),&str> {
unsafe { unsafe {
let writeopts = rocksdb_ffi::rocksdb_writeoptions_create(); let writeopts = rocksdb_ffi::rocksdb_writeoptions_create();
let err = 0 as *mut i8; let err = 0 as *mut i8;
rocksdb_ffi::rocksdb_delete(self.inner, writeopts, key.as_ptr(), rocksdb_ffi::rocksdb_delete(self.inner, writeopts, key.as_ptr(),
key.len() as size_t, err); key.len() as size_t, err);
if err.is_not_null() { if err != 0 as *mut i8 {
let cs = CString::new(err as *const i8, true); let cs = from_c_str(err as *const i8);
match cs.as_str() { return Err(cs);
Some(error_string) =>
return Err(error_string.to_string()),
None => {
let ie = IoError::last_error();
return Err(format!(
"ERROR: desc:{}, details:{}",
ie.desc,
ie.detail.unwrap_or_else(
|| {"none provided by OS".to_string()})))
}
}
} }
return Ok(()) return Ok(())
} }
@ -458,27 +408,37 @@ impl RocksDB {
} }
pub struct RocksDBVector { pub struct RocksDBVector {
inner: CVec<u8>, base: Unique<u8>,
len: usize,
}
impl Drop for RocksDBVector {
fn drop(&mut self) {
unsafe {
libc::free(self.base.0 as *mut libc::c_void);
}
}
} }
impl RocksDBVector { impl RocksDBVector {
pub fn from_c(val: *mut u8, val_len: size_t) -> RocksDBVector { pub fn from_c(val: *mut u8, val_len: size_t) -> RocksDBVector {
unsafe { unsafe {
let val = Unique(val); let base = Unique(val);
RocksDBVector { RocksDBVector {
inner: base: base,
CVec::new_with_dtor(val.0, val_len as uint, len: val_len as usize,
move |:| libc::free(val.0 as *mut libc::c_void))
} }
} }
} }
pub fn as_slice<'a>(&'a self) -> &'a [u8] { pub fn as_slice<'a>(&'a self) -> &'a [u8] {
self.inner.as_slice() unsafe {
slice::from_raw_buf(self.base.0 as *const u8, self.len)
}
} }
pub fn to_utf8(&self) -> Option<&str> { pub fn to_utf8(&self) -> Option<&str> {
from_utf8(self.inner.as_slice()).ok() from_utf8(self.as_slice()).ok()
} }
} }
@ -492,7 +452,7 @@ pub enum RocksDBResult<'a,T,E> {
} }
impl <'a,T,E> RocksDBResult<'a,T,E> { impl <'a,T,E> RocksDBResult<'a,T,E> {
pub fn map<U>(self, f: FnMut(T) -> U) -> RocksDBResult<'a,U,E> { pub fn map<U>(self, f: &mut FnOnce(T) -> U) -> RocksDBResult<'a,U,E> {
match self { match self {
RocksDBResult::Some(x) => RocksDBResult::Some(f(x)), RocksDBResult::Some(x) => RocksDBResult::Some(f(x)),
RocksDBResult::None => RocksDBResult::None, RocksDBResult::None => RocksDBResult::None,
@ -510,7 +470,7 @@ impl <'a,T,E> RocksDBResult<'a,T,E> {
} }
} }
pub fn on_error<U>(self, f: FnMut(E) -> U) -> RocksDBResult<'a,T,U> { pub fn on_error<U>(self, f: FnOnce(E) -> U) -> RocksDBResult<'a,T,U> {
match self { match self {
RocksDBResult::Some(x) => RocksDBResult::Some(x), RocksDBResult::Some(x) => RocksDBResult::Some(x),
RocksDBResult::None => RocksDBResult::None, RocksDBResult::None => RocksDBResult::None,
@ -518,7 +478,7 @@ impl <'a,T,E> RocksDBResult<'a,T,E> {
} }
} }
pub fn on_absent(self, f: FnMut() -> ()) -> RocksDBResult<'a,T,E> { pub fn on_absent(self, f: FnOnce() -> ()) -> RocksDBResult<'a,T,E> {
match self { match self {
RocksDBResult::Some(x) => RocksDBResult::Some(x), RocksDBResult::Some(x) => RocksDBResult::Some(x),
RocksDBResult::None => { RocksDBResult::None => {
@ -571,8 +531,8 @@ fn external() {
pub struct MergeOperands<'a> { pub struct MergeOperands<'a> {
operands_list: *const *const c_char, operands_list: *const *const c_char,
operands_list_len: *const size_t, operands_list_len: *const size_t,
num_operands: uint, num_operands: usize,
cursor: uint, cursor: usize,
} }
impl <'a> MergeOperands<'a> { impl <'a> MergeOperands<'a> {
@ -583,29 +543,30 @@ impl <'a> MergeOperands<'a> {
MergeOperands { MergeOperands {
operands_list: operands_list, operands_list: operands_list,
operands_list_len: operands_list_len, operands_list_len: operands_list_len,
num_operands: num_operands as uint, num_operands: num_operands as usize,
cursor: 0, cursor: 0,
} }
} }
} }
impl<'a> Iterator for &'a mut MergeOperands<'a> { impl<'a> Iterator for &'a mut MergeOperands<'a> {
type Item = &'a [u8];
fn next(&mut self) -> Option<&'a [u8]> { fn next(&mut self) -> Option<&'a [u8]> {
use std::raw::Slice; use std::raw::Slice;
match self.cursor == self.num_operands { match self.cursor == self.num_operands {
true => None, true => None,
false => { false => {
unsafe { unsafe {
let base = self.operands_list as uint; let base = self.operands_list as usize;
let base_len = self.operands_list_len as uint; let base_len = self.operands_list_len as usize;
let spacing = mem::size_of::<*const *const u8>(); let spacing = mem::size_of::<*const *const u8>();
let spacing_len = mem::size_of::<*const size_t>(); let spacing_len = mem::size_of::<*const size_t>();
let len_ptr = (base_len + (spacing_len * self.cursor)) let len_ptr = (base_len + (spacing_len * self.cursor))
as *const size_t; as *const size_t;
let len = *len_ptr as uint; let len = *len_ptr as usize;
let ptr = base + (spacing * self.cursor); let ptr = base + (spacing * self.cursor);
let op = String::from_raw_buf_len(*(ptr as *const *const u8), len); let op = String::from_utf8(slice::from_raw_buf(*(ptr as *const *const u8), len));
let des: Option<uint> = from_utf8(op.as_slice()); let des: Option<usize> = op.to_utf8();
self.cursor += 1; self.cursor += 1;
Some(mem::transmute(Slice{data:*(ptr as *const *const u8) Some(mem::transmute(Slice{data:*(ptr as *const *const u8)
as *const u8, len: len})) as *const u8, len: len}))
@ -614,7 +575,7 @@ impl<'a> Iterator for &'a mut MergeOperands<'a> {
} }
} }
fn size_hint(&self) -> (uint, Option<uint>) { fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.num_operands - self.cursor; let remaining = self.num_operands - self.cursor;
(remaining, Some(remaining)) (remaining, Some(remaining))
} }
@ -653,9 +614,9 @@ extern "C" fn full_merge_callback(
&mut MergeOperands::new(operands_list, &mut MergeOperands::new(operands_list,
operands_list_len, operands_list_len,
num_operands); num_operands);
let key = String::from_raw_buf_len(key as *const u8, key_len as uint); let key = String::from_utf8(slice::from_raw_buf(key as *const u8, key_len as usize));
let oldval = String::from_raw_buf_len(existing_value as *const u8, let oldval = String::from_utf8(slice::from_raw_buf(existing_value as *const u8,
existing_value_len as uint); existing_value_len as usize));
let mut result = let mut result =
(cb.merge_fn)(key.as_bytes(), Some(oldval.as_bytes()), operands); (cb.merge_fn)(key.as_bytes(), Some(oldval.as_bytes()), operands);
result.shrink_to_fit(); result.shrink_to_fit();
@ -686,7 +647,7 @@ extern "C" fn partial_merge_callback(
let operands = &mut MergeOperands::new(operands_list, let operands = &mut MergeOperands::new(operands_list,
operands_list_len, operands_list_len,
num_operands); num_operands);
let key = String::from_raw_buf_len(key as *const u8, key_len as uint); let key = String::from_utf8(slice::from_raw_buf(key as *const u8, key_len as usize));
let mut result = (cb.merge_fn)(key.as_bytes(), None, operands); let mut result = (cb.merge_fn)(key.as_bytes(), None, operands);
result.shrink_to_fit(); result.shrink_to_fit();
//TODO(tan) investigate zero-copy techniques to improve performance //TODO(tan) investigate zero-copy techniques to improve performance
@ -702,7 +663,8 @@ extern "C" fn partial_merge_callback(
fn test_provided_merge(new_key: &[u8], existing_val: Option<&[u8]>, fn test_provided_merge(new_key: &[u8], existing_val: Option<&[u8]>,
mut operands: &mut MergeOperands) -> Vec<u8> { mut operands: &mut MergeOperands) -> Vec<u8> {
let mut result: Vec<u8> = Vec::with_capacity(operands.size_hint().val0()); let nops = operands.size_hint().0();
let mut result: Vec<u8> = Vec::with_capacity(nops);
match existing_val { match existing_val {
Some(v) => result.push_all(v), Some(v) => result.push_all(v),
None => (), None => (),

Loading…
Cancel
Save