merging benesch's work and more recent spacejam master

master
Rick Richardson 7 years ago
commit 3377a30391
  1. 4
      .travis.yml
  2. 1
      librocksdb-sys/Cargo.toml
  3. 18
      librocksdb-sys/build.rs
  4. 1019
      librocksdb-sys/src/lib.rs
  5. 24
      src/backup.rs
  6. 38
      src/compaction_filter.rs
  7. 13
      src/comparator.rs
  8. 535
      src/db.rs
  9. 73
      src/db_options.rs
  10. 7
      src/lib.rs
  11. 66
      src/merge_operator.rs

@ -10,8 +10,12 @@ addons:
apt:
sources:
- ubuntu-toolchain-r-test
- llvm-toolchain-trusty
packages:
- g++-5
- llvm-3.9-dev
- libclang-3.9-dev
- clang-3.9
script:
- cargo test --manifest-path=librocksdb-sys/Cargo.toml

@ -24,3 +24,4 @@ const-cstr = "0.2"
[build-dependencies]
gcc = { version = "0.3", features = ["parallel"] }
make-cmd = "0.1"
bindgen = "0.29"

@ -1,6 +1,9 @@
extern crate gcc;
extern crate bindgen;
use std::env;
use std::fs;
use std::path::PathBuf;
fn link(name: &str, bundled: bool) {
use std::env::var;
@ -30,7 +33,18 @@ fn build_rocksdb() {
println!("cargo:rerun-if-changed=build.rs");
println!("cargo:rerun-if-changed=rocksdb/");
let mut config = gcc::Config::new();
let bindings = bindgen::Builder::default()
.header("rocksdb/include/rocksdb/c.h")
.ctypes_prefix("libc")
.generate()
.expect("unable to generate rocksdb bindings");
let out_path = PathBuf::from(env::var("OUT_DIR").unwrap());
bindings
.write_to_file(out_path.join("bindings.rs"))
.expect("unable to write rocksdb bindings");
let mut config = gcc::Build::new();
config.include("rocksdb/include/");
config.include("rocksdb/");
config.include("rocksdb/third-party/gtest-1.7.0/fused-src/");
@ -115,7 +129,7 @@ fn build_rocksdb() {
}
fn build_snappy() {
let mut config = gcc::Config::new();
let mut config = gcc::Build::new();
config.include("snappy/");
config.include(".");

File diff suppressed because it is too large Load Diff

@ -35,16 +35,19 @@ pub struct RestoreOptions {
impl BackupEngine {
/// Open a backup engine with the specified options.
pub fn open<P: AsRef<Path>>(opts: &BackupEngineOptions,
path: P)
-> Result<BackupEngine, Error> {
pub fn open<P: AsRef<Path>>(
opts: &BackupEngineOptions,
path: P,
) -> Result<BackupEngine, Error> {
let path = path.as_ref();
let cpath = match CString::new(path.to_string_lossy().as_bytes()) {
Ok(c) => c,
Err(_) => {
return Err(Error::new("Failed to convert path to CString \
return Err(Error::new(
"Failed to convert path to CString \
when opening backup engine"
.to_owned()))
.to_owned(),
))
}
};
@ -60,15 +63,20 @@ impl BackupEngine {
pub fn create_new_backup(&mut self, db: &DB) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_backup_engine_create_new_backup(self.inner, db.inner));
ffi_try!(ffi::rocksdb_backup_engine_create_new_backup(
self.inner,
db.inner,
));
Ok(())
}
}
pub fn purge_old_backups(&mut self, num_backups_to_keep: usize) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_backup_engine_purge_old_backups(self.inner,
num_backups_to_keep as uint32_t));
ffi_try!(ffi::rocksdb_backup_engine_purge_old_backups(
self.inner,
num_backups_to_keep as uint32_t,
));
Ok(())
}
}

@ -43,42 +43,48 @@ pub enum Decision {
/// [set_compaction_filter]: ../struct.Options.html#method.set_compaction_filter
pub trait CompactionFilterFn: FnMut(u32, &[u8], &[u8]) -> Decision {}
impl<F> CompactionFilterFn for F
where F: FnMut(u32, &[u8], &[u8]) -> Decision,
F: Send + 'static
where
F: FnMut(u32, &[u8], &[u8]) -> Decision,
F: Send + 'static,
{
}
pub struct CompactionFilterCallback<F>
where F: CompactionFilterFn
where
F: CompactionFilterFn,
{
pub name: CString,
pub filter_fn: F,
}
pub unsafe extern "C" fn destructor_callback<F>(raw_cb: *mut c_void)
where F: CompactionFilterFn
where
F: CompactionFilterFn,
{
let _: Box<CompactionFilterCallback<F>> = mem::transmute(raw_cb);
}
pub unsafe extern "C" fn name_callback<F>(raw_cb: *mut c_void) -> *const c_char
where F: CompactionFilterFn
where
F: CompactionFilterFn,
{
let cb = &*(raw_cb as *mut CompactionFilterCallback<F>);
cb.name.as_ptr()
}
pub unsafe extern "C" fn filter_callback<F>(raw_cb: *mut c_void,
level: c_int,
raw_key: *const c_char,
key_length: size_t,
existing_value: *const c_char,
value_length: size_t,
new_value: *mut *mut c_char,
new_value_length: *mut size_t,
value_changed: *mut c_uchar)
-> c_uchar
where F: CompactionFilterFn
pub unsafe extern "C" fn filter_callback<F>(
raw_cb: *mut c_void,
level: c_int,
raw_key: *const c_char,
key_length: size_t,
existing_value: *const c_char,
value_length: size_t,
new_value: *mut *mut c_char,
new_value_length: *mut size_t,
value_changed: *mut c_uchar,
) -> c_uchar
where
F: CompactionFilterFn,
{
use self::Decision::*;

@ -37,12 +37,13 @@ pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char {
ptr as *const c_char
}
pub unsafe extern "C" fn compare_callback(raw_cb: *mut c_void,
a_raw: *const c_char,
a_len: size_t,
b_raw: *const c_char,
b_len: size_t)
-> c_int {
pub unsafe extern "C" fn compare_callback(
raw_cb: *mut c_void,
a_raw: *const c_char,
a_len: size_t,
b_raw: *const c_char,
b_len: size_t,
) -> c_int {
let cb: &mut ComparatorCallback = &mut *(raw_cb as *mut ComparatorCallback);
let a: &[u8] = slice::from_raw_parts(a_raw as *const u8, a_len as usize);
let b: &[u8] = slice::from_raw_parts(b_raw as *const u8, b_len as usize);

@ -55,10 +55,10 @@ pub enum DBCompactionStyle {
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum DBRecoveryMode {
TolerateCorruptedTailRecords = ffi::rocksdb_recovery_mode_tolerate_corrupted_tail_records as isize,
AbsoluteConsistency = ffi::rocksdb_recovery_mode_absolute_consistency as isize,
PointInTime = ffi::rocksdb_recovery_mode_point_in_time as isize,
SkipAnyCorruptedRecord = ffi::rocksdb_recovery_mode_skip_any_corrupted_record as isize,
TolerateCorruptedTailRecords = ffi::rocksdb_tolerate_corrupted_tail_records_recovery as isize,
AbsoluteConsistency = ffi::rocksdb_absolute_consistency_recovery as isize,
PointInTime = ffi::rocksdb_point_in_time_recovery as isize,
SkipAnyCorruptedRecord = ffi::rocksdb_skip_any_corrupted_records_recovery as isize,
}
/// An atomic batch of write operations.
@ -197,17 +197,14 @@ pub enum IteratorMode<'a> {
impl DBRawIterator {
fn new(db: &DB, readopts: &ReadOptions) -> DBRawIterator {
unsafe {
DBRawIterator {
inner: ffi::rocksdb_create_iterator(db.inner, readopts.inner),
}
}
unsafe { DBRawIterator { inner: ffi::rocksdb_create_iterator(db.inner, readopts.inner) } }
}
fn new_cf(db: &DB,
cf_handle: ColumnFamily,
readopts: &ReadOptions)
-> Result<DBRawIterator, Error> {
fn new_cf(
db: &DB,
cf_handle: ColumnFamily,
readopts: &ReadOptions,
) -> Result<DBRawIterator, Error> {
unsafe {
Ok(DBRawIterator {
inner: ffi::rocksdb_create_iterator_cf(db.inner, readopts.inner, cf_handle.inner),
@ -251,7 +248,9 @@ impl DBRawIterator {
/// }
/// ```
pub fn seek_to_first(&mut self) {
unsafe { ffi::rocksdb_iter_seek_to_first(self.inner); }
unsafe {
ffi::rocksdb_iter_seek_to_first(self.inner);
}
}
/// Seeks to the last key in the database.
@ -285,7 +284,9 @@ impl DBRawIterator {
/// }
/// ```
pub fn seek_to_last(&mut self) {
unsafe { ffi::rocksdb_iter_seek_to_last(self.inner); }
unsafe {
ffi::rocksdb_iter_seek_to_last(self.inner);
}
}
/// Seeks to the specified key or the first key that lexicographically follows it.
@ -312,7 +313,13 @@ impl DBRawIterator {
/// }
/// ```
pub fn seek(&mut self, key: &[u8]) {
unsafe { ffi::rocksdb_iter_seek(self.inner, key.as_ptr() as *const c_char, key.len() as size_t); }
unsafe {
ffi::rocksdb_iter_seek(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
);
}
}
/// Seeks to the specified key, or the first key that lexicographically precedes it.
@ -339,21 +346,31 @@ impl DBRawIterator {
/// // There are no keys in the database
/// }
pub fn seek_for_prev(&mut self, key: &[u8]) {
unsafe { ffi::rocksdb_iter_seek_for_prev(self.inner, key.as_ptr() as *const c_char, key.len() as size_t); }
unsafe {
ffi::rocksdb_iter_seek_for_prev(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
);
}
}
/// Seeks to the next key.
///
/// Returns true if the iterator is valid after this operation.
pub fn next(&mut self) {
unsafe { ffi::rocksdb_iter_next(self.inner); }
unsafe {
ffi::rocksdb_iter_next(self.inner);
}
}
/// Seeks to the previous key.
///
/// Returns true if the iterator is valid after this operation.
pub fn prev(&mut self) {
unsafe { ffi::rocksdb_iter_prev(self.inner); }
unsafe {
ffi::rocksdb_iter_prev(self.inner);
}
}
/// Returns a slice to the internal buffer storing the current key.
@ -377,9 +394,7 @@ impl DBRawIterator {
/// Returns a copy of the current key.
pub fn key(&self) -> Option<Vec<u8>> {
unsafe {
self.key_inner().map(|key| key.to_vec())
}
unsafe { self.key_inner().map(|key| key.to_vec()) }
}
/// Returns a slice to the internal buffer storing the current value.
@ -403,9 +418,7 @@ impl DBRawIterator {
/// Returns a copy of the current value.
pub fn value(&self) -> Option<Vec<u8>> {
unsafe {
self.value_inner().map(|value| value.to_vec())
}
unsafe { self.value_inner().map(|value| value.to_vec()) }
}
}
@ -428,11 +441,12 @@ impl DBIterator {
rv
}
fn new_cf(db: &DB,
cf_handle: ColumnFamily,
readopts: &ReadOptions,
mode: IteratorMode)
-> Result<DBIterator, Error> {
fn new_cf(
db: &DB,
cf_handle: ColumnFamily,
readopts: &ReadOptions,
mode: IteratorMode,
) -> Result<DBIterator, Error> {
let mut rv = DBIterator {
raw: try!(DBRawIterator::new_cf(db, cf_handle, readopts)),
direction: Direction::Forward, // blown away by set_mode()
@ -484,7 +498,10 @@ impl Iterator for DBIterator {
if self.raw.valid() {
// .key() and .value() only ever return None if valid == false, which we've just cheked
Some((self.raw.key().unwrap().into_boxed_slice(), self.raw.value().unwrap().into_boxed_slice()))
Some((
self.raw.key().unwrap().into_boxed_slice(),
self.raw.value().unwrap().into_boxed_slice(),
))
} else {
None
}
@ -512,10 +529,11 @@ impl<'a> Snapshot<'a> {
DBIterator::new(self.db, &readopts, mode)
}
pub fn iterator_cf(&self,
cf_handle: ColumnFamily,
mode: IteratorMode)
-> Result<DBIterator, Error> {
pub fn iterator_cf(
&self,
cf_handle: ColumnFamily,
mode: IteratorMode,
) -> Result<DBIterator, Error> {
let mut readopts = ReadOptions::default();
readopts.set_snapshot(self);
DBIterator::new_cf(self.db, cf_handle, &readopts, mode)
@ -527,9 +545,7 @@ impl<'a> Snapshot<'a> {
DBRawIterator::new(self.db, &readopts)
}
pub fn raw_iterator_cf(&self,
cf_handle: ColumnFamily)
-> Result<DBRawIterator, Error> {
pub fn raw_iterator_cf(&self, cf_handle: ColumnFamily) -> Result<DBRawIterator, Error> {
let mut readopts = ReadOptions::default();
readopts.set_snapshot(self);
DBRawIterator::new_cf(self.db, cf_handle, &readopts)
@ -541,10 +557,7 @@ impl<'a> Snapshot<'a> {
self.db.get_opt(key, &readopts)
}
pub fn get_cf(&self,
cf: ColumnFamily,
key: &[u8])
-> Result<Option<DBVector>, Error> {
pub fn get_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<Option<DBVector>, Error> {
let mut readopts = ReadOptions::default();
readopts.set_snapshot(self);
self.db.get_cf_opt(cf, key, &readopts)
@ -584,16 +597,20 @@ impl DB {
let cpath = match CString::new(path.to_string_lossy().as_bytes()) {
Ok(c) => c,
Err(_) => {
return Err(Error::new("Failed to convert path to CString \
return Err(Error::new(
"Failed to convert path to CString \
when opening DB."
.to_owned()))
.to_owned(),
))
}
};
if let Err(e) = fs::create_dir_all(&path) {
return Err(Error::new(format!("Failed to create RocksDB\
return Err(Error::new(format!(
"Failed to create RocksDB\
directory: `{:?}`.",
e)));
e
)));
}
let db: *mut ffi::rocksdb_t;
@ -612,34 +629,40 @@ impl DB {
// We need to store our CStrings in an intermediate vector
// so that their pointers remain valid.
let c_cfs: Vec<CString> = cfs_v.iter()
let c_cfs: Vec<CString> = cfs_v
.iter()
.map(|cf| CString::new(cf.as_bytes()).unwrap())
.collect();
let cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
let mut cfnames: Vec<_> = c_cfs.iter().map(|cf| cf.as_ptr()).collect();
// These handles will be populated by DB.
let mut cfhandles: Vec<_> = cfs_v.iter().map(|_| ptr::null_mut()).collect();
// TODO(tyler) allow options to be passed in.
let cfopts: Vec<_> = cfs_v.iter()
let mut cfopts: Vec<_> = cfs_v
.iter()
.map(|_| unsafe { ffi::rocksdb_options_create() as *const _ })
.collect();
unsafe {
db = ffi_try!(ffi::rocksdb_open_column_families(opts.inner,
cpath.as_ptr() as *const _,
cfs_v.len() as c_int,
cfnames.as_ptr() as *const _,
cfopts.as_ptr(),
cfhandles.as_mut_ptr()));
db = ffi_try!(ffi::rocksdb_open_column_families(
opts.inner,
cpath.as_ptr(),
cfs_v.len() as c_int,
cfnames.as_mut_ptr(),
cfopts.as_mut_ptr(),
cfhandles.as_mut_ptr(),
));
}
for handle in &cfhandles {
if handle.is_null() {
return Err(Error::new("Received null column family \
return Err(Error::new(
"Received null column family \
handle from DB."
.to_owned()));
.to_owned(),
));
}
}
@ -664,22 +687,27 @@ impl DB {
let cpath = match CString::new(path.to_string_lossy().as_bytes()) {
Ok(c) => c,
Err(_) => {
return Err(Error::new("Failed to convert path to CString \
return Err(Error::new(
"Failed to convert path to CString \
when opening DB."
.to_owned()))
.to_owned(),
))
}
};
let mut length = 0;
unsafe {
let ptr = ffi_try!(ffi::rocksdb_list_column_families(opts.inner,
cpath.as_ptr() as *const _,
&mut length));
let vec = Vec::from_raw_parts(ptr, length, length).iter().map(|&ptr| {
CString::from_raw(ptr).into_string().unwrap()
}).collect();
let ptr = ffi_try!(ffi::rocksdb_list_column_families(
opts.inner,
cpath.as_ptr() as *const _,
&mut length,
));
let vec = Vec::from_raw_parts(ptr, length, length)
.iter()
.map(|&ptr| CString::from_raw(ptr).into_string().unwrap())
.collect();
Ok(vec)
}
}
@ -724,21 +752,25 @@ impl DB {
pub fn get_opt(&self, key: &[u8], readopts: &ReadOptions) -> Result<Option<DBVector>, Error> {
if readopts.inner.is_null() {
return Err(Error::new("Unable to create RocksDB read options. \
return Err(Error::new(
"Unable to create RocksDB read options. \
This is a fairly trivial call, and its \
failure may be indicative of a \
mis-compiled or mis-loaded RocksDB \
library."
.to_owned()));
.to_owned(),
));
}
unsafe {
let mut val_len: size_t = 0;
let val = ffi_try!(ffi::rocksdb_get(self.inner,
readopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
&mut val_len)) as *mut u8;
let val = ffi_try!(ffi::rocksdb_get(
self.inner,
readopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
&mut val_len,
)) as *mut u8;
if val.is_null() {
Ok(None)
} else {
@ -752,28 +784,33 @@ impl DB {
self.get_opt(key, &ReadOptions::default())
}
pub fn get_cf_opt(&self,
cf: ColumnFamily,
key: &[u8],
readopts: &ReadOptions)
-> Result<Option<DBVector>, Error> {
pub fn get_cf_opt(
&self,
cf: ColumnFamily,
key: &[u8],
readopts: &ReadOptions,
) -> Result<Option<DBVector>, Error> {
if readopts.inner.is_null() {
return Err(Error::new("Unable to create RocksDB read options. \
return Err(Error::new(
"Unable to create RocksDB read options. \
This is a fairly trivial call, and its \
failure may be indicative of a \
mis-compiled or mis-loaded RocksDB \
library."
.to_owned()));
.to_owned(),
));
}
unsafe {
let mut val_len: size_t = 0;
let val = ffi_try!(ffi::rocksdb_get_cf(self.inner,
readopts.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
&mut val_len)) as *mut u8;
let val = ffi_try!(ffi::rocksdb_get_cf(
self.inner,
readopts.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
&mut val_len,
)) as *mut u8;
if val.is_null() {
Ok(None)
} else {
@ -782,28 +819,27 @@ impl DB {
}
}
pub fn get_cf(&self,
cf: ColumnFamily,
key: &[u8])
-> Result<Option<DBVector>, Error> {
pub fn get_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<Option<DBVector>, Error> {
self.get_cf_opt(cf, key, &ReadOptions::default())
}
pub fn create_cf(&mut self,
name: &str,
opts: &Options)
-> Result<ColumnFamily, Error> {
pub fn create_cf(&mut self, name: &str, opts: &Options) -> Result<ColumnFamily, Error> {
let cname = match CString::new(name.as_bytes()) {
Ok(c) => c,
Err(_) => {
return Err(Error::new("Failed to convert path to CString \
return Err(Error::new(
"Failed to convert path to CString \
when opening rocksdb"
.to_owned()))
.to_owned(),
))
}
};
let cf = unsafe {
let cf_handler =
ffi_try!(ffi::rocksdb_create_column_family(self.inner, opts.inner, cname.as_ptr()));
let cf_handler = ffi_try!(ffi::rocksdb_create_column_family(
self.inner,
opts.inner,
cname.as_ptr(),
));
let cf = ColumnFamily { inner: cf_handler };
self.cfs.insert(name.to_string(), cf);
cf
@ -814,10 +850,15 @@ impl DB {
pub fn drop_cf(&mut self, name: &str) -> Result<(), Error> {
let cf = self.cfs.get(name);
if cf.is_none() {
return Err(Error::new(format!("Invalid column family: {}", name).to_owned()));
return Err(Error::new(
format!("Invalid column family: {}", name).to_owned(),
));
}
unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf.unwrap().inner));
ffi_try!(ffi::rocksdb_drop_column_family(
self.inner,
cf.unwrap().inner,
));
}
Ok(())
}
@ -832,10 +873,11 @@ impl DB {
DBIterator::new(self, &opts, mode)
}
pub fn iterator_cf(&self,
cf_handle: ColumnFamily,
mode: IteratorMode)
-> Result<DBIterator, Error> {
pub fn iterator_cf(
&self,
cf_handle: ColumnFamily,
mode: IteratorMode,
) -> Result<DBIterator, Error> {
let opts = ReadOptions::default();
DBIterator::new_cf(self, cf_handle, &opts, mode)
}
@ -845,9 +887,7 @@ impl DB {
DBRawIterator::new(self, &opts)
}
pub fn raw_iterator_cf(&self,
cf_handle: ColumnFamily)
-> Result<DBRawIterator, Error> {
pub fn raw_iterator_cf(&self, cf_handle: ColumnFamily) -> Result<DBRawIterator, Error> {
let opts = ReadOptions::default();
DBRawIterator::new_cf(self, cf_handle, &opts)
}
@ -858,89 +898,105 @@ impl DB {
pub fn put_opt(&self, key: &[u8], value: &[u8], writeopts: &WriteOptions) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_put(self.inner,
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t));
ffi_try!(ffi::rocksdb_put(
self.inner,
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
));
Ok(())
}
}
pub fn put_cf_opt(&self,
cf: ColumnFamily,
key: &[u8],
value: &[u8],
writeopts: &WriteOptions)
-> Result<(), Error> {
pub fn put_cf_opt(
&self,
cf: ColumnFamily,
key: &[u8],
value: &[u8],
writeopts: &WriteOptions,
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_put_cf(self.inner,
writeopts.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t));
ffi_try!(ffi::rocksdb_put_cf(
self.inner,
writeopts.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
));
Ok(())
}
}
pub fn merge_opt(&self,
key: &[u8],
value: &[u8],
writeopts: &WriteOptions)
-> Result<(), Error> {
pub fn merge_opt(
&self,
key: &[u8],
value: &[u8],
writeopts: &WriteOptions,
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_merge(self.inner,
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t));
ffi_try!(ffi::rocksdb_merge(
self.inner,
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
));
Ok(())
}
}
pub fn merge_cf_opt(&self,
cf: ColumnFamily,
key: &[u8],
value: &[u8],
writeopts: &WriteOptions)
-> Result<(), Error> {
pub fn merge_cf_opt(
&self,
cf: ColumnFamily,
key: &[u8],
value: &[u8],
writeopts: &WriteOptions,
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_merge_cf(self.inner,
writeopts.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t));
ffi_try!(ffi::rocksdb_merge_cf(
self.inner,
writeopts.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
));
Ok(())
}
}
pub fn delete_opt(&self, key: &[u8], writeopts: &WriteOptions) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_delete(self.inner,
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t));
ffi_try!(ffi::rocksdb_delete(
self.inner,
writeopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
));
Ok(())
}
}
pub fn delete_cf_opt(&self,
cf: ColumnFamily,
key: &[u8],
writeopts: &WriteOptions)
-> Result<(), Error> {
pub fn delete_cf_opt(
&self,
cf: ColumnFamily,
key: &[u8],
writeopts: &WriteOptions,
) -> Result<(), Error> {
unsafe {
ffi_try!(ffi::rocksdb_delete_cf(self.inner,
writeopts.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t));
ffi_try!(ffi::rocksdb_delete_cf(
self.inner,
writeopts.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
));
Ok(())
}
}
@ -949,11 +1005,7 @@ impl DB {
self.put_opt(key, value, &WriteOptions::default())
}
pub fn put_cf(&self,
cf: ColumnFamily,
key: &[u8],
value: &[u8])
-> Result<(), Error> {
pub fn put_cf(&self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<(), Error> {
self.put_cf_opt(cf, key, value, &WriteOptions::default())
}
@ -961,11 +1013,7 @@ impl DB {
self.merge_opt(key, value, &WriteOptions::default())
}
pub fn merge_cf(&self,
cf: ColumnFamily,
key: &[u8],
value: &[u8])
-> Result<(), Error> {
pub fn merge_cf(&self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<(), Error> {
self.merge_cf_opt(cf, key, value, &WriteOptions::default())
}
@ -973,34 +1021,32 @@ impl DB {
self.delete_opt(key, &WriteOptions::default())
}
pub fn delete_cf(&self,
cf: ColumnFamily,
key: &[u8])
-> Result<(), Error> {
pub fn delete_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<(), Error> {
self.delete_cf_opt(cf, key, &WriteOptions::default())
}
pub fn compact_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) {
unsafe {
ffi::rocksdb_compact_range(self.inner,
opt_bytes_to_ptr(start),
start.map_or(0, |s| s.len()) as size_t,
opt_bytes_to_ptr(end),
end.map_or(0, |e| e.len()) as size_t);
ffi::rocksdb_compact_range(
self.inner,
opt_bytes_to_ptr(start),
start.map_or(0, |s| s.len()) as size_t,
opt_bytes_to_ptr(end),
end.map_or(0, |e| e.len()) as size_t,
);
}
}
pub fn compact_range_cf(&self,
cf: ColumnFamily,
start: Option<&[u8]>,
end: Option<&[u8]>) {
pub fn compact_range_cf(&self, cf: ColumnFamily, start: Option<&[u8]>, end: Option<&[u8]>) {
unsafe {
ffi::rocksdb_compact_range_cf(self.inner,
cf.inner,
opt_bytes_to_ptr(start),
start.map_or(0, |s| s.len()) as size_t,
opt_bytes_to_ptr(end),
end.map_or(0, |e| e.len()) as size_t);
ffi::rocksdb_compact_range_cf(
self.inner,
cf.inner,
opt_bytes_to_ptr(start),
start.map_or(0, |s| s.len()) as size_t,
opt_bytes_to_ptr(end),
end.map_or(0, |e| e.len()) as size_t,
);
}
}
}
@ -1017,54 +1063,54 @@ impl WriteBatch {
/// Insert a value into the database under the given key.
pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), Error> {
unsafe {
ffi::rocksdb_writebatch_put(self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t);
ffi::rocksdb_writebatch_put(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
);
Ok(())
}
}
pub fn put_cf(&mut self,
cf: ColumnFamily,
key: &[u8],
value: &[u8])
-> Result<(), Error> {
pub fn put_cf(&mut self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<(), Error> {
unsafe {
ffi::rocksdb_writebatch_put_cf(self.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t);
ffi::rocksdb_writebatch_put_cf(
self.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
);
Ok(())
}
}
pub fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), Error> {
unsafe {
ffi::rocksdb_writebatch_merge(self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t);
ffi::rocksdb_writebatch_merge(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
);
Ok(())
}
}
pub fn merge_cf(&mut self,
cf: ColumnFamily,
key: &[u8],
value: &[u8])
-> Result<(), Error> {
pub fn merge_cf(&mut self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<(), Error> {
unsafe {
ffi::rocksdb_writebatch_merge_cf(self.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t);
ffi::rocksdb_writebatch_merge_cf(
self.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
value.as_ptr() as *const c_char,
value.len() as size_t,
);
Ok(())
}
}
@ -1074,22 +1120,23 @@ impl WriteBatch {
/// Returns an error if the key was not found.
pub fn delete(&mut self, key: &[u8]) -> Result<(), Error> {
unsafe {
ffi::rocksdb_writebatch_delete(self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t);
ffi::rocksdb_writebatch_delete(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
);
Ok(())
}
}
pub fn delete_cf(&mut self,
cf: ColumnFamily,
key: &[u8])
-> Result<(), Error> {
pub fn delete_cf(&mut self, cf: ColumnFamily, key: &[u8]) -> Result<(), Error> {
unsafe {
ffi::rocksdb_writebatch_delete_cf(self.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t);
ffi::rocksdb_writebatch_delete_cf(
self.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
);
Ok(())
}
}
@ -1149,9 +1196,11 @@ impl ReadOptions {
pub fn set_iterate_upper_bound(&mut self, key: &[u8]) {
unsafe {
ffi::rocksdb_readoptions_set_iterate_upper_bound(self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t);
ffi::rocksdb_readoptions_set_iterate_upper_bound(
self.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
);
}
}
}
@ -1310,9 +1359,11 @@ fn iterator_test() {
assert!(p.is_ok());
let iter = db.iterator(IteratorMode::Start);
for (k, v) in iter {
println!("Hello {}: {}",
str::from_utf8(&*k).unwrap(),
str::from_utf8(&*v).unwrap());
println!(
"Hello {}: {}",
str::from_utf8(&*k).unwrap(),
str::from_utf8(&*v).unwrap()
);
}
}
let opts = Options::default();

@ -16,13 +16,13 @@
use {BlockBasedOptions, DBCompactionStyle, DBCompressionType, DBRecoveryMode, Options,
WriteOptions};
use compaction_filter::{self, CompactionFilterCallback, CompactionFilterFn, filter_callback};
use comparator::{self, ComparatorCallback, CompareFn};
use ffi;
use libc::{self, c_int, c_uchar, c_uint, c_void, size_t, uint64_t};
use merge_operator::{self, MergeFn, MergeOperatorCallback, full_merge_callback,
partial_merge_callback};
use compaction_filter::{self, CompactionFilterCallback, CompactionFilterFn, filter_callback};
use std::ffi::{CStr, CString};
use std::mem;
@ -124,7 +124,8 @@ impl Options {
unsafe {
ffi::rocksdb_options_optimize_level_style_compaction(
self.inner,
memtable_memory_budget as uint64_t);
memtable_memory_budget as uint64_t,
);
}
}
@ -190,10 +191,12 @@ impl Options {
/// ```
pub fn set_compression_per_level(&mut self, level_types: &[DBCompressionType]) {
unsafe {
let level_types: Vec<_> = level_types.iter().map(|&t| t as c_int).collect();
ffi::rocksdb_options_set_compression_per_level(self.inner,
level_types.as_ptr(),
level_types.len() as size_t)
let mut level_types: Vec<_> = level_types.iter().map(|&t| t as c_int).collect();
ffi::rocksdb_options_set_compression_per_level(
self.inner,
level_types.as_mut_ptr(),
level_types.len() as size_t,
)
}
}
@ -204,17 +207,20 @@ impl Options {
});
unsafe {
let mo = ffi::rocksdb_mergeoperator_create(mem::transmute(cb),
Some(merge_operator::destructor_callback),
Some(full_merge_callback),
Some(partial_merge_callback),
None,
Some(merge_operator::name_callback));
let mo = ffi::rocksdb_mergeoperator_create(
mem::transmute(cb),
Some(merge_operator::destructor_callback),
Some(full_merge_callback),
Some(partial_merge_callback),
None,
Some(merge_operator::name_callback),
);
ffi::rocksdb_options_set_merge_operator(self.inner, mo);
}
}
#[deprecated(since="0.5.0", note="add_merge_operator has been renamed to set_merge_operator")]
#[deprecated(since = "0.5.0",
note = "add_merge_operator has been renamed to set_merge_operator")]
pub fn add_merge_operator(&mut self, name: &str, merge_fn: MergeFn) {
self.set_merge_operator(name, merge_fn);
}
@ -230,7 +236,8 @@ impl Options {
/// If multi-threaded compaction is used, `filter_fn` may be called multiple times
/// simultaneously.
pub fn set_compaction_filter<F>(&mut self, name: &str, filter_fn: F)
where F: CompactionFilterFn + Send + 'static
where
F: CompactionFilterFn + Send + 'static,
{
let cb = Box::new(CompactionFilterCallback {
name: CString::new(name.as_bytes()).unwrap(),
@ -238,10 +245,12 @@ impl Options {
});
unsafe {
let cf = ffi::rocksdb_compactionfilter_create(mem::transmute(cb),
Some(compaction_filter::destructor_callback::<F>),
Some(filter_callback::<F>),
Some(compaction_filter::name_callback::<F>));
let cf = ffi::rocksdb_compactionfilter_create(
mem::transmute(cb),
Some(compaction_filter::destructor_callback::<F>),
Some(filter_callback::<F>),
Some(compaction_filter::name_callback::<F>),
);
ffi::rocksdb_options_set_compaction_filter(self.inner, cf);
}
}
@ -259,10 +268,12 @@ impl Options {
});
unsafe {
let cmp = ffi::rocksdb_comparator_create(mem::transmute(cb),
Some(comparator::destructor_callback),
Some(comparator::compare_callback),
Some(comparator::name_callback));
let cmp = ffi::rocksdb_comparator_create(
mem::transmute(cb),
Some(comparator::destructor_callback),
Some(comparator::compare_callback),
Some(comparator::name_callback),
);
ffi::rocksdb_options_set_comparator(self.inner, cmp);
}
}
@ -365,8 +376,9 @@ impl Options {
/// opts.set_allow_concurrent_memtable_write(false);
/// ```
pub fn set_allow_concurrent_memtable_write(&mut self, allow: bool) {
unsafe { ffi::rocksdb_options_set_allow_concurrent_memtable_write(self.inner,
allow as c_uchar) }
unsafe {
ffi::rocksdb_options_set_allow_concurrent_memtable_write(self.inner, allow as c_uchar)
}
}
/// Enable direct I/O mode for reading
@ -413,8 +425,10 @@ impl Options {
/// ```
pub fn set_use_direct_io_for_flush_and_compaction(&mut self, enabled: bool) {
unsafe {
ffi::rocksdb_options_set_use_direct_io_for_flush_and_compaction(self.inner,
enabled as c_uchar);
ffi::rocksdb_options_set_use_direct_io_for_flush_and_compaction(
self.inner,
enabled as c_uchar,
);
}
}
@ -446,7 +460,8 @@ impl Options {
/// let mut opts = Options::default();
/// opts.set_allow_os_buffer(false);
/// ```
#[deprecated(since="0.7.0", note="replaced with set_use_direct_reads/set_use_direct_io_for_flush_and_compaction methods")]
#[deprecated(since = "0.7.0",
note = "replaced with set_use_direct_reads/set_use_direct_io_for_flush_and_compaction methods")]
pub fn set_allow_os_buffer(&mut self, is_allow: bool) {
self.set_use_direct_reads(!is_allow);
self.set_use_direct_io_for_flush_and_compaction(!is_allow);
@ -599,9 +614,9 @@ impl Options {
/// use rocksdb::Options;
///
/// let mut opts = Options::default();
/// opts.set_max_bytes_for_level_multiplier(4);
/// opts.set_max_bytes_for_level_multiplier(4.0);
/// ```
pub fn set_max_bytes_for_level_multiplier(&mut self, mul: i32) {
pub fn set_max_bytes_for_level_multiplier(&mut self, mul: f64) {
unsafe {
ffi::rocksdb_options_set_max_bytes_for_level_multiplier(self.inner, mul);
}

@ -44,11 +44,12 @@ pub mod compaction_filter;
mod db;
mod db_options;
pub use db::{DBCompactionStyle, DBCompressionType, DBIterator, DBRawIterator, DBRecoveryMode, DBVector,
ReadOptions, Direction, IteratorMode, Snapshot, WriteBatch, new_bloom_filter};
pub use compaction_filter::Decision as CompactionDecision;
pub use db::{DBCompactionStyle, DBCompressionType, DBIterator, DBRawIterator, DBRecoveryMode,
DBVector, ReadOptions, Direction, IteratorMode, Snapshot, WriteBatch,
new_bloom_filter};
pub use merge_operator::MergeOperands;
pub use compaction_filter::Decision as CompactionDecision;
use std::collections::BTreeMap;
use std::error;
use std::fmt;

@ -76,17 +76,18 @@ pub unsafe extern "C" fn name_callback(raw_cb: *mut c_void) -> *const c_char {
cb.name.as_ptr()
}
pub unsafe extern "C" fn full_merge_callback(raw_cb: *mut c_void,
raw_key: *const c_char,
key_len: size_t,
existing_value: *const c_char,
existing_value_len: size_t,
operands_list: *const *const c_char,
operands_list_len: *const size_t,
num_operands: c_int,
success: *mut u8,
new_value_length: *mut size_t)
-> *mut c_char {
pub unsafe extern "C" fn full_merge_callback(
raw_cb: *mut c_void,
raw_key: *const c_char,
key_len: size_t,
existing_value: *const c_char,
existing_value_len: size_t,
operands_list: *const *const c_char,
operands_list_len: *const size_t,
num_operands: c_int,
success: *mut u8,
new_value_length: *mut size_t,
) -> *mut c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
@ -102,15 +103,16 @@ pub unsafe extern "C" fn full_merge_callback(raw_cb: *mut c_void,
buf as *mut c_char
}
pub unsafe extern "C" fn partial_merge_callback(raw_cb: *mut c_void,
raw_key: *const c_char,
key_len: size_t,
operands_list: *const *const c_char,
operands_list_len: *const size_t,
num_operands: c_int,
success: *mut u8,
new_value_length: *mut size_t)
-> *mut c_char {
pub unsafe extern "C" fn partial_merge_callback(
raw_cb: *mut c_void,
raw_key: *const c_char,
key_len: size_t,
operands_list: *const *const c_char,
operands_list_len: *const size_t,
num_operands: c_int,
success: *mut u8,
new_value_length: *mut size_t,
) -> *mut c_char {
let cb = &mut *(raw_cb as *mut MergeOperatorCallback);
let operands = &mut MergeOperands::new(operands_list, operands_list_len, num_operands);
let key = slice::from_raw_parts(raw_key as *const u8, key_len as usize);
@ -134,10 +136,11 @@ pub struct MergeOperands {
}
impl MergeOperands {
fn new(operands_list: *const *const c_char,
operands_list_len: *const size_t,
num_operands: c_int)
-> MergeOperands {
fn new(
operands_list: *const *const c_char,
operands_list_len: *const size_t,
num_operands: c_int,
) -> MergeOperands {
assert!(num_operands >= 0);
MergeOperands {
operands_list: operands_list,
@ -164,8 +167,10 @@ impl<'a> Iterator for &'a mut MergeOperands {
let len = *len_ptr as usize;
let ptr = base + (spacing * self.cursor);
self.cursor += 1;
Some(mem::transmute(slice::from_raw_parts(*(ptr as *const *const u8) as *const u8,
len)))
Some(mem::transmute(slice::from_raw_parts(
*(ptr as *const *const u8) as *const u8,
len,
)))
}
}
}
@ -178,10 +183,11 @@ impl<'a> Iterator for &'a mut MergeOperands {
#[cfg(test)]
#[allow(unused_variables)]
fn test_provided_merge(new_key: &[u8],
existing_val: Option<&[u8]>,
operands: &mut MergeOperands)
-> Vec<u8> {
fn test_provided_merge(
new_key: &[u8],
existing_val: Option<&[u8]>,
operands: &mut MergeOperands,
) -> Vec<u8> {
let nops = operands.size_hint().0;
let mut result: Vec<u8> = Vec::with_capacity(nops);
if let Some(v) = existing_val {

Loading…
Cancel
Save