Merge pull request #253 from iSynaptic/ergonomics-and-safety

Ergonomics and safety improvements
master
Jordan Terrell 6 years ago committed by GitHub
commit 13be813880
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      src/compaction_filter.rs
  2. 309
      src/db.rs
  3. 4
      src/ffi_util.rs
  4. 12
      src/lib.rs
  5. 4
      src/merge_operator.rs
  6. 27
      tests/test_checkpoint.rs
  7. 4
      tests/test_column_family.rs
  8. 51
      tests/test_compationfilter.rs
  9. 181
      tests/test_db.rs

@ -126,7 +126,7 @@ fn compaction_filter_test() {
let _ = db.put(b"k1", b"a"); let _ = db.put(b"k1", b"a");
let _ = db.put(b"_k", b"b"); let _ = db.put(b"_k", b"b");
let _ = db.put(b"%k", b"c"); let _ = db.put(b"%k", b"c");
db.compact_range(None, None); db.compact_range(None::<&[u8]>, None::<&[u8]>);
assert_eq!(&*db.get(b"k1").unwrap().unwrap(), b"a"); assert_eq!(&*db.get(b"k1").unwrap().unwrap(), b"a");
assert!(db.get(b"_k").unwrap().is_none()); assert!(db.get(b"_k").unwrap().is_none());
assert_eq!(&*db.get(b"%k").unwrap().unwrap(), b"secret"); assert_eq!(&*db.get(b"%k").unwrap().unwrap(), b"secret");

@ -22,6 +22,7 @@ use std::collections::BTreeMap;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::fmt; use std::fmt;
use std::fs; use std::fs;
use std::marker::PhantomData;
use std::ops::Deref; use std::ops::Deref;
use std::path::Path; use std::path::Path;
use std::ptr; use std::ptr;
@ -29,10 +30,6 @@ use std::slice;
use std::str; use std::str;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
pub fn new_bloom_filter(bits: c_int) -> *mut ffi::rocksdb_filterpolicy_t {
unsafe { ffi::rocksdb_filterpolicy_create_bloom(bits) }
}
unsafe impl Send for DB {} unsafe impl Send for DB {}
unsafe impl Sync for DB {} unsafe impl Sync for DB {}
@ -155,8 +152,9 @@ pub struct Snapshot<'a> {
/// } /// }
/// let _ = DB::destroy(&Options::default(), path); /// let _ = DB::destroy(&Options::default(), path);
/// ``` /// ```
pub struct DBRawIterator { pub struct DBRawIterator<'a> {
inner: *mut ffi::rocksdb_iterator_t, inner: *mut ffi::rocksdb_iterator_t,
db: PhantomData<&'a DB>
} }
/// An iterator over a database or column family, with specifiable /// An iterator over a database or column family, with specifiable
@ -190,13 +188,13 @@ pub struct DBRawIterator {
/// } /// }
/// let _ = DB::destroy(&Options::default(), path); /// let _ = DB::destroy(&Options::default(), path);
/// ``` /// ```
pub struct DBIterator { pub struct DBIterator<'a> {
raw: DBRawIterator, raw: DBRawIterator<'a>,
direction: Direction, direction: Direction,
just_seeked: bool, just_seeked: bool,
} }
unsafe impl Send for DBIterator {} unsafe impl<'a> Send for DBIterator<'a> {}
pub enum Direction { pub enum Direction {
Forward, Forward,
@ -211,11 +209,12 @@ pub enum IteratorMode<'a> {
From(&'a [u8], Direction), From(&'a [u8], Direction),
} }
impl DBRawIterator { impl<'a> DBRawIterator<'a> {
fn new(db: &DB, readopts: &ReadOptions) -> DBRawIterator { fn new(db: &DB, readopts: &ReadOptions) -> DBRawIterator<'a> {
unsafe { unsafe {
DBRawIterator { DBRawIterator {
inner: ffi::rocksdb_create_iterator(db.inner, readopts.inner), inner: ffi::rocksdb_create_iterator(db.inner, readopts.inner),
db: PhantomData
} }
} }
} }
@ -224,10 +223,11 @@ impl DBRawIterator {
db: &DB, db: &DB,
cf_handle: ColumnFamily, cf_handle: ColumnFamily,
readopts: &ReadOptions, readopts: &ReadOptions,
) -> Result<DBRawIterator, Error> { ) -> Result<DBRawIterator<'a>, Error> {
unsafe { unsafe {
Ok(DBRawIterator { Ok(DBRawIterator {
inner: ffi::rocksdb_create_iterator_cf(db.inner, readopts.inner, cf_handle.inner), inner: ffi::rocksdb_create_iterator_cf(db.inner, readopts.inner, cf_handle.inner),
db: PhantomData
}) })
} }
} }
@ -337,7 +337,9 @@ impl DBRawIterator {
/// } /// }
/// let _ = DB::destroy(&Options::default(), path); /// let _ = DB::destroy(&Options::default(), path);
/// ``` /// ```
pub fn seek(&mut self, key: &[u8]) { pub fn seek<K: AsRef<[u8]>>(&mut self, key: K) {
let key = key.as_ref();
unsafe { unsafe {
ffi::rocksdb_iter_seek( ffi::rocksdb_iter_seek(
self.inner, self.inner,
@ -374,7 +376,9 @@ impl DBRawIterator {
/// } /// }
/// let _ = DB::destroy(&Options::default(), path); /// let _ = DB::destroy(&Options::default(), path);
/// ``` /// ```
pub fn seek_for_prev(&mut self, key: &[u8]) { pub fn seek_for_prev<K: AsRef<[u8]>>(&mut self, key: K) {
let key = key.as_ref();
unsafe { unsafe {
ffi::rocksdb_iter_seek_for_prev( ffi::rocksdb_iter_seek_for_prev(
self.inner, self.inner,
@ -451,7 +455,7 @@ impl DBRawIterator {
} }
} }
impl Drop for DBRawIterator { impl<'a> Drop for DBRawIterator<'a> {
fn drop(&mut self) { fn drop(&mut self) {
unsafe { unsafe {
ffi::rocksdb_iter_destroy(self.inner); ffi::rocksdb_iter_destroy(self.inner);
@ -459,8 +463,8 @@ impl Drop for DBRawIterator {
} }
} }
impl DBIterator { impl<'a> DBIterator<'a> {
fn new(db: &DB, readopts: &ReadOptions, mode: IteratorMode) -> DBIterator { fn new(db: &DB, readopts: &ReadOptions, mode: IteratorMode) -> DBIterator<'a> {
let mut rv = DBIterator { let mut rv = DBIterator {
raw: DBRawIterator::new(db, readopts), raw: DBRawIterator::new(db, readopts),
direction: Direction::Forward, // blown away by set_mode() direction: Direction::Forward, // blown away by set_mode()
@ -475,7 +479,7 @@ impl DBIterator {
cf_handle: ColumnFamily, cf_handle: ColumnFamily,
readopts: &ReadOptions, readopts: &ReadOptions,
mode: IteratorMode, mode: IteratorMode,
) -> Result<DBIterator, Error> { ) -> Result<DBIterator<'a>, Error> {
let mut rv = DBIterator { let mut rv = DBIterator {
raw: DBRawIterator::new_cf(db, cf_handle, readopts)?, raw: DBRawIterator::new_cf(db, cf_handle, readopts)?,
direction: Direction::Forward, // blown away by set_mode() direction: Direction::Forward, // blown away by set_mode()
@ -513,7 +517,7 @@ impl DBIterator {
} }
} }
impl Iterator for DBIterator { impl<'a> Iterator for DBIterator<'a> {
type Item = KVBytes; type Item = KVBytes;
fn next(&mut self) -> Option<KVBytes> { fn next(&mut self) -> Option<KVBytes> {
@ -540,8 +544,8 @@ impl Iterator for DBIterator {
} }
} }
impl Into<DBRawIterator> for DBIterator { impl<'a> Into<DBRawIterator<'a>> for DBIterator<'a> {
fn into(self) -> DBRawIterator { fn into(self) -> DBRawIterator<'a> {
self.raw self.raw
} }
} }
@ -556,43 +560,72 @@ impl<'a> Snapshot<'a> {
} }
pub fn iterator(&self, mode: IteratorMode) -> DBIterator { pub fn iterator(&self, mode: IteratorMode) -> DBIterator {
let mut readopts = ReadOptions::default(); let readopts = ReadOptions::default();
self.iterator_opt(mode, readopts)
}
pub fn iterator_cf(
&self,
cf_handle: ColumnFamily,
mode: IteratorMode,
) -> Result<DBIterator, Error> {
let readopts = ReadOptions::default();
self.iterator_cf_opt(cf_handle, readopts, mode)
}
pub fn iterator_opt(&self, mode: IteratorMode, mut readopts: ReadOptions) -> DBIterator {
readopts.set_snapshot(self); readopts.set_snapshot(self);
DBIterator::new(self.db, &readopts, mode) DBIterator::new(self.db, &readopts, mode)
} }
pub fn iterator_cf( pub fn iterator_cf_opt(
&self, &self,
cf_handle: ColumnFamily, cf_handle: ColumnFamily,
mut readopts: ReadOptions,
mode: IteratorMode, mode: IteratorMode,
) -> Result<DBIterator, Error> { ) -> Result<DBIterator, Error> {
let mut readopts = ReadOptions::default();
readopts.set_snapshot(self); readopts.set_snapshot(self);
DBIterator::new_cf(self.db, cf_handle, &readopts, mode) DBIterator::new_cf(self.db, cf_handle, &readopts, mode)
} }
pub fn raw_iterator(&self) -> DBRawIterator { pub fn raw_iterator(&self) -> DBRawIterator {
let mut readopts = ReadOptions::default(); let readopts = ReadOptions::default();
self.raw_iterator_opt(readopts)
}
pub fn raw_iterator_cf(&self, cf_handle: ColumnFamily) -> Result<DBRawIterator, Error> {
let readopts = ReadOptions::default();
self.raw_iterator_cf_opt(cf_handle, readopts)
}
pub fn raw_iterator_opt(&self, mut readopts: ReadOptions) -> DBRawIterator {
readopts.set_snapshot(self); readopts.set_snapshot(self);
DBRawIterator::new(self.db, &readopts) DBRawIterator::new(self.db, &readopts)
} }
pub fn raw_iterator_cf(&self, cf_handle: ColumnFamily) -> Result<DBRawIterator, Error> { pub fn raw_iterator_cf_opt(&self, cf_handle: ColumnFamily, mut readopts: ReadOptions) -> Result<DBRawIterator, Error> {
let mut readopts = ReadOptions::default();
readopts.set_snapshot(self); readopts.set_snapshot(self);
DBRawIterator::new_cf(self.db, cf_handle, &readopts) DBRawIterator::new_cf(self.db, cf_handle, &readopts)
} }
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, Error> { pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> {
let mut readopts = ReadOptions::default(); let readopts = ReadOptions::default();
self.get_opt(key, readopts)
}
pub fn get_cf<K: AsRef<[u8]>>(&self, cf: ColumnFamily, key: K) -> Result<Option<DBVector>, Error> {
let readopts = ReadOptions::default();
self.get_cf_opt(cf, key.as_ref(), readopts)
}
pub fn get_opt<K: AsRef<[u8]>>(&self, key: K, mut readopts: ReadOptions) -> Result<Option<DBVector>, Error> {
readopts.set_snapshot(self); readopts.set_snapshot(self);
self.db.get_opt(key, &readopts) self.db.get_opt(key.as_ref(), &readopts)
} }
pub fn get_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<Option<DBVector>, Error> { pub fn get_cf_opt<K: AsRef<[u8]>>(&self, cf: ColumnFamily, key: K, mut readopts: ReadOptions) -> Result<Option<DBVector>, Error> {
let mut readopts = ReadOptions::default();
readopts.set_snapshot(self); readopts.set_snapshot(self);
self.db.get_cf_opt(cf, key, &readopts) self.db.get_cf_opt(cf, key.as_ref(), &readopts)
} }
} }
@ -724,7 +757,7 @@ impl DB {
for (n, h) in cfs_v.iter().zip(cfhandles) { for (n, h) in cfs_v.iter().zip(cfhandles) {
cf_map.write() cf_map.write()
.map_err(|e| Error::new(e.to_string()))? .map_err(|e| Error::new(e.to_string()))?
.insert(n.name.clone(), ColumnFamily { inner: h }); .insert(n.name.clone(), h);
} }
} }
@ -796,7 +829,7 @@ impl DB {
self.write_opt(batch, &wo) self.write_opt(batch, &wo)
} }
pub fn get_opt(&self, key: &[u8], readopts: &ReadOptions) -> Result<Option<DBVector>, Error> { pub fn get_opt<K: AsRef<[u8]>>(&self, key: K, readopts: &ReadOptions) -> Result<Option<DBVector>, Error> {
if readopts.inner.is_null() { if readopts.inner.is_null() {
return Err(Error::new( return Err(Error::new(
"Unable to create RocksDB read options. \ "Unable to create RocksDB read options. \
@ -808,6 +841,8 @@ impl DB {
)); ));
} }
let key = key.as_ref();
unsafe { unsafe {
let mut val_len: size_t = 0; let mut val_len: size_t = 0;
let val = ffi_try!(ffi::rocksdb_get( let val = ffi_try!(ffi::rocksdb_get(
@ -826,14 +861,14 @@ impl DB {
} }
/// Return the bytes associated with a key value /// Return the bytes associated with a key value
pub fn get(&self, key: &[u8]) -> Result<Option<DBVector>, Error> { pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> {
self.get_opt(key, &ReadOptions::default()) self.get_opt(key.as_ref(), &ReadOptions::default())
} }
pub fn get_cf_opt( pub fn get_cf_opt<K: AsRef<[u8]>>(
&self, &self,
cf: ColumnFamily, cf: ColumnFamily,
key: &[u8], key: K,
readopts: &ReadOptions, readopts: &ReadOptions,
) -> Result<Option<DBVector>, Error> { ) -> Result<Option<DBVector>, Error> {
if readopts.inner.is_null() { if readopts.inner.is_null() {
@ -847,6 +882,8 @@ impl DB {
)); ));
} }
let key = key.as_ref();
unsafe { unsafe {
let mut val_len: size_t = 0; let mut val_len: size_t = 0;
let val = ffi_try!(ffi::rocksdb_get_cf( let val = ffi_try!(ffi::rocksdb_get_cf(
@ -865,8 +902,8 @@ impl DB {
} }
} }
pub fn get_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<Option<DBVector>, Error> { pub fn get_cf<K: AsRef<[u8]>>(&self, cf: ColumnFamily, key: K) -> Result<Option<DBVector>, Error> {
self.get_cf_opt(cf, key, &ReadOptions::default()) self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
} }
pub fn create_cf(&self, name: &str, opts: &Options) -> Result<ColumnFamily, Error> { pub fn create_cf(&self, name: &str, opts: &Options) -> Result<ColumnFamily, Error> {
@ -881,15 +918,19 @@ impl DB {
} }
}; };
let cf = unsafe { let cf = unsafe {
let cf_handler = ffi_try!(ffi::rocksdb_create_column_family( let cf_handle = ffi_try!(ffi::rocksdb_create_column_family(
self.inner, self.inner,
opts.inner, opts.inner,
cname.as_ptr(), cname.as_ptr(),
)); ));
let cf = ColumnFamily { inner: cf_handler };
self.cfs.write().map_err(|e| Error::new(e.to_string()))? self.cfs.write().map_err(|e| Error::new(e.to_string()))?
.insert(name.to_string(), cf); .insert(name.to_string(), cf_handle);
cf
ColumnFamily {
inner: cf_handle,
db: PhantomData,
}
}; };
Ok(cf) Ok(cf)
} }
@ -898,7 +939,7 @@ impl DB {
if let Some(cf) = self.cfs.write().map_err(|e| Error::new(e.to_string()))? if let Some(cf) = self.cfs.write().map_err(|e| Error::new(e.to_string()))?
.remove(name) { .remove(name) {
unsafe { unsafe {
ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf.inner,)); ffi_try!(ffi::rocksdb_drop_column_family(self.inner, cf,));
} }
Ok(()) Ok(())
} else { } else {
@ -910,12 +951,23 @@ impl DB {
/// Return the underlying column family handle. /// Return the underlying column family handle.
pub fn cf_handle(&self, name: &str) -> Option<ColumnFamily> { pub fn cf_handle(&self, name: &str) -> Option<ColumnFamily> {
self.cfs.read().ok()?.get(name).cloned() self.cfs
.read()
.ok()?
.get(name)
.map(|h| ColumnFamily {
inner: *h,
db: PhantomData
})
} }
pub fn iterator(&self, mode: IteratorMode) -> DBIterator { pub fn iterator(&self, mode: IteratorMode) -> DBIterator {
let opts = ReadOptions::default(); let readopts = ReadOptions::default();
DBIterator::new(self, &opts, mode) self.iterator_opt(mode, &readopts)
}
pub fn iterator_opt(&self, mode: IteratorMode, readopts: &ReadOptions) -> DBIterator {
DBIterator::new(self, &readopts, mode)
} }
/// Opens an interator with `set_total_order_seek` enabled. /// Opens an interator with `set_total_order_seek` enabled.
@ -927,10 +979,10 @@ impl DB {
DBIterator::new(self, &opts, mode) DBIterator::new(self, &opts, mode)
} }
pub fn prefix_iterator(&self, prefix: &[u8]) -> DBIterator { pub fn prefix_iterator<P: AsRef<[u8]>>(&self, prefix: P) -> DBIterator {
let mut opts = ReadOptions::default(); let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true); opts.set_prefix_same_as_start(true);
DBIterator::new(self, &opts, IteratorMode::From(prefix, Direction::Forward)) DBIterator::new(self, &opts, IteratorMode::From(prefix.as_ref(), Direction::Forward))
} }
pub fn iterator_cf( pub fn iterator_cf(
@ -952,10 +1004,10 @@ impl DB {
DBIterator::new_cf(self, cf_handle, &opts, mode) DBIterator::new_cf(self, cf_handle, &opts, mode)
} }
pub fn prefix_iterator_cf( pub fn prefix_iterator_cf<P: AsRef<[u8]>>(
&self, &self,
cf_handle: ColumnFamily, cf_handle: ColumnFamily,
prefix: &[u8] prefix: P
) -> Result<DBIterator, Error> { ) -> Result<DBIterator, Error> {
let mut opts = ReadOptions::default(); let mut opts = ReadOptions::default();
opts.set_prefix_same_as_start(true); opts.set_prefix_same_as_start(true);
@ -963,7 +1015,7 @@ impl DB {
self, self,
cf_handle, cf_handle,
&opts, &opts,
IteratorMode::From(prefix, Direction::Forward), IteratorMode::From(prefix.as_ref(), Direction::Forward),
) )
} }
@ -981,8 +1033,15 @@ impl DB {
Snapshot::new(self) Snapshot::new(self)
} }
pub fn put_opt(&self, key: &[u8], value: &[u8], writeopts: &WriteOptions) -> Result<(), Error> { pub fn put_opt<K, V>(&self, key: K, value: V, writeopts: &WriteOptions) -> Result<(), Error>
where K: AsRef<[u8]>,
V: AsRef<[u8]> {
let key = key.as_ref();
let value = value.as_ref();
unsafe { unsafe {
ffi_try!(ffi::rocksdb_put( ffi_try!(ffi::rocksdb_put(
self.inner, self.inner,
writeopts.inner, writeopts.inner,
@ -995,14 +1054,21 @@ impl DB {
} }
} }
pub fn put_cf_opt( pub fn put_cf_opt<K, V>(
&self, &self,
cf: ColumnFamily, cf: ColumnFamily,
key: &[u8], key: K,
value: &[u8], value: V,
writeopts: &WriteOptions, writeopts: &WriteOptions,
) -> Result<(), Error> { ) -> Result<(), Error>
where K: AsRef<[u8]>,
V: AsRef<[u8]> {
let key = key.as_ref();
let value = value.as_ref();
unsafe { unsafe {
ffi_try!(ffi::rocksdb_put_cf( ffi_try!(ffi::rocksdb_put_cf(
self.inner, self.inner,
writeopts.inner, writeopts.inner,
@ -1016,12 +1082,18 @@ impl DB {
} }
} }
pub fn merge_opt( pub fn merge_opt<K, V>(
&self, &self,
key: &[u8], key: K,
value: &[u8], value: V,
writeopts: &WriteOptions, writeopts: &WriteOptions,
) -> Result<(), Error> { ) -> Result<(), Error>
where K: AsRef<[u8]>,
V: AsRef<[u8]> {
let key = key.as_ref();
let value = value.as_ref();
unsafe { unsafe {
ffi_try!(ffi::rocksdb_merge( ffi_try!(ffi::rocksdb_merge(
self.inner, self.inner,
@ -1035,14 +1107,21 @@ impl DB {
} }
} }
pub fn merge_cf_opt( pub fn merge_cf_opt<K, V>(
&self, &self,
cf: ColumnFamily, cf: ColumnFamily,
key: &[u8], key: K,
value: &[u8], value: V,
writeopts: &WriteOptions, writeopts: &WriteOptions,
) -> Result<(), Error> { ) -> Result<(), Error>
where K: AsRef<[u8]>,
V: AsRef<[u8]> {
let key = key.as_ref();
let value = value.as_ref();
unsafe { unsafe {
ffi_try!(ffi::rocksdb_merge_cf( ffi_try!(ffi::rocksdb_merge_cf(
self.inner, self.inner,
writeopts.inner, writeopts.inner,
@ -1056,7 +1135,9 @@ impl DB {
} }
} }
pub fn delete_opt(&self, key: &[u8], writeopts: &WriteOptions) -> Result<(), Error> { pub fn delete_opt<K: AsRef<[u8]>>(&self, key: K, writeopts: &WriteOptions) -> Result<(), Error> {
let key = key.as_ref();
unsafe { unsafe {
ffi_try!(ffi::rocksdb_delete( ffi_try!(ffi::rocksdb_delete(
self.inner, self.inner,
@ -1068,12 +1149,15 @@ impl DB {
} }
} }
pub fn delete_cf_opt( pub fn delete_cf_opt<K: AsRef<[u8]>>(
&self, &self,
cf: ColumnFamily, cf: ColumnFamily,
key: &[u8], key: K,
writeopts: &WriteOptions, writeopts: &WriteOptions,
) -> Result<(), Error> { ) -> Result<(), Error> {
let key = key.as_ref();
unsafe { unsafe {
ffi_try!(ffi::rocksdb_delete_cf( ffi_try!(ffi::rocksdb_delete_cf(
self.inner, self.inner,
@ -1086,32 +1170,47 @@ impl DB {
} }
} }
pub fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn put<K, V>(&self, key: K, value: V) -> Result<(), Error>
self.put_opt(key, value, &WriteOptions::default()) where K: AsRef<[u8]>,
V: AsRef<[u8]> {
self.put_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
} }
pub fn put_cf(&self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn put_cf<K, V>(&self, cf: ColumnFamily, key: K, value: V) -> Result<(), Error>
self.put_cf_opt(cf, key, value, &WriteOptions::default()) where K: AsRef<[u8]>,
V: AsRef<[u8]> {
self.put_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
} }
pub fn merge(&self, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn merge<K, V>(&self, key: K, value: V) -> Result<(), Error>
self.merge_opt(key, value, &WriteOptions::default()) where K: AsRef<[u8]>,
V: AsRef<[u8]> {
self.merge_opt(key.as_ref(), value.as_ref(), &WriteOptions::default())
} }
pub fn merge_cf(&self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn merge_cf<K, V>(&self, cf: ColumnFamily, key: K, value: V) -> Result<(), Error>
self.merge_cf_opt(cf, key, value, &WriteOptions::default()) where K: AsRef<[u8]>,
V: AsRef<[u8]> {
self.merge_cf_opt(cf, key.as_ref(), value.as_ref(), &WriteOptions::default())
} }
pub fn delete(&self, key: &[u8]) -> Result<(), Error> { pub fn delete<K: AsRef<[u8]>>(&self, key: K) -> Result<(), Error> {
self.delete_opt(key, &WriteOptions::default()) self.delete_opt(key.as_ref(), &WriteOptions::default())
} }
pub fn delete_cf(&self, cf: ColumnFamily, key: &[u8]) -> Result<(), Error> { pub fn delete_cf<K: AsRef<[u8]>>(&self, cf: ColumnFamily, key: K) -> Result<(), Error> {
self.delete_cf_opt(cf, key, &WriteOptions::default()) self.delete_cf_opt(cf, key.as_ref(), &WriteOptions::default())
} }
pub fn compact_range(&self, start: Option<&[u8]>, end: Option<&[u8]>) { pub fn compact_range<S: AsRef<[u8]>, E: AsRef<[u8]>>(&self, start: Option<S>, end: Option<E>) {
unsafe { unsafe {
let start = start.as_ref().map(|s| s.as_ref());
let end = end.as_ref().map(|e| e.as_ref());
ffi::rocksdb_compact_range( ffi::rocksdb_compact_range(
self.inner, self.inner,
opt_bytes_to_ptr(start), opt_bytes_to_ptr(start),
@ -1184,7 +1283,13 @@ impl WriteBatch {
} }
/// Insert a value into the database under the given key. /// Insert a value into the database under the given key.
pub fn put(&mut self, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn put<K, V>(&mut self, key: K, value: V) -> Result<(), Error>
where K: AsRef<[u8]>,
V: AsRef<[u8]> {
let key = key.as_ref();
let value = value.as_ref();
unsafe { unsafe {
ffi::rocksdb_writebatch_put( ffi::rocksdb_writebatch_put(
self.inner, self.inner,
@ -1197,7 +1302,13 @@ impl WriteBatch {
} }
} }
pub fn put_cf(&mut self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn put_cf<K, V>(&mut self, cf: ColumnFamily, key: K, value: V) -> Result<(), Error>
where K: AsRef<[u8]>,
V: AsRef<[u8]> {
let key = key.as_ref();
let value = value.as_ref();
unsafe { unsafe {
ffi::rocksdb_writebatch_put_cf( ffi::rocksdb_writebatch_put_cf(
self.inner, self.inner,
@ -1211,7 +1322,13 @@ impl WriteBatch {
} }
} }
pub fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn merge<K, V>(&mut self, key: K, value: V) -> Result<(), Error>
where K: AsRef<[u8]>,
V: AsRef<[u8]> {
let key = key.as_ref();
let value = value.as_ref();
unsafe { unsafe {
ffi::rocksdb_writebatch_merge( ffi::rocksdb_writebatch_merge(
self.inner, self.inner,
@ -1224,7 +1341,13 @@ impl WriteBatch {
} }
} }
pub fn merge_cf(&mut self, cf: ColumnFamily, key: &[u8], value: &[u8]) -> Result<(), Error> { pub fn merge_cf<K, V>(&mut self, cf: ColumnFamily, key: K, value: V) -> Result<(), Error>
where K: AsRef<[u8]>,
V: AsRef<[u8]> {
let key = key.as_ref();
let value = value.as_ref();
unsafe { unsafe {
ffi::rocksdb_writebatch_merge_cf( ffi::rocksdb_writebatch_merge_cf(
self.inner, self.inner,
@ -1241,7 +1364,9 @@ impl WriteBatch {
/// Remove the database entry for key. /// Remove the database entry for key.
/// ///
/// Returns an error if the key was not found. /// Returns an error if the key was not found.
pub fn delete(&mut self, key: &[u8]) -> Result<(), Error> { pub fn delete<K: AsRef<[u8]>>(&mut self, key: K) -> Result<(), Error> {
let key = key.as_ref();
unsafe { unsafe {
ffi::rocksdb_writebatch_delete( ffi::rocksdb_writebatch_delete(
self.inner, self.inner,
@ -1252,7 +1377,9 @@ impl WriteBatch {
} }
} }
pub fn delete_cf(&mut self, cf: ColumnFamily, key: &[u8]) -> Result<(), Error> { pub fn delete_cf<K: AsRef<[u8]>>(&mut self, cf: ColumnFamily, key: K) -> Result<(), Error> {
let key = key.as_ref();
unsafe { unsafe {
ffi::rocksdb_writebatch_delete_cf( ffi::rocksdb_writebatch_delete_cf(
self.inner, self.inner,
@ -1292,7 +1419,7 @@ impl Drop for DB {
unsafe { unsafe {
if let Ok(cfs) = self.cfs.read() { if let Ok(cfs) = self.cfs.read() {
for cf in cfs.values() { for cf in cfs.values() {
ffi::rocksdb_column_family_handle_destroy(cf.inner); ffi::rocksdb_column_family_handle_destroy(*cf);
} }
} }
ffi::rocksdb_close(self.inner); ffi::rocksdb_close(self.inner);
@ -1329,7 +1456,9 @@ impl ReadOptions {
} }
} }
pub fn set_iterate_upper_bound(&mut self, key: &[u8]) { pub fn set_iterate_upper_bound<K: AsRef<[u8]>>(&mut self, key: K) {
let key = key.as_ref();
unsafe { unsafe {
ffi::rocksdb_readoptions_set_iterate_upper_bound( ffi::rocksdb_readoptions_set_iterate_upper_bound(
self.inner, self.inner,

@ -26,9 +26,9 @@ pub fn error_message(ptr: *const c_char) -> String {
s s
} }
pub fn opt_bytes_to_ptr(opt: Option<&[u8]>) -> *const c_char { pub fn opt_bytes_to_ptr<T: AsRef<[u8]>>(opt: Option<T>) -> *const c_char {
match opt { match opt {
Some(v) => v.as_ptr() as *const c_char, Some(v) => v.as_ref().as_ptr() as *const c_char,
None => ptr::null(), None => ptr::null(),
} }
} }

@ -71,8 +71,8 @@ mod slice_transform;
pub use compaction_filter::Decision as CompactionDecision; pub use compaction_filter::Decision as CompactionDecision;
pub use db::{ pub use db::{
new_bloom_filter, DBCompactionStyle, DBCompressionType, DBIterator, DBRawIterator, DBCompactionStyle, DBCompressionType, DBIterator, DBRawIterator, DBRecoveryMode,
DBRecoveryMode, DBVector, Direction, IteratorMode, ReadOptions, Snapshot, WriteBatch, DBVector, Direction, IteratorMode, ReadOptions, Snapshot, WriteBatch,
}; };
pub use slice_transform::SliceTransform; pub use slice_transform::SliceTransform;
@ -81,6 +81,7 @@ pub use merge_operator::MergeOperands;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::error; use std::error;
use std::fmt; use std::fmt;
use std::marker::PhantomData;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -89,7 +90,7 @@ use std::sync::{Arc, RwLock};
/// See crate level documentation for a simple usage example. /// See crate level documentation for a simple usage example.
pub struct DB { pub struct DB {
inner: *mut ffi::rocksdb_t, inner: *mut ffi::rocksdb_t,
cfs: Arc<RwLock<BTreeMap<String, ColumnFamily>>>, cfs: Arc<RwLock<BTreeMap<String, *mut ffi::rocksdb_column_family_handle_t>>>,
path: PathBuf, path: PathBuf,
} }
@ -244,8 +245,9 @@ pub struct WriteOptions {
/// An opaque type used to represent a column family. Returned from some functions, and used /// An opaque type used to represent a column family. Returned from some functions, and used
/// in others /// in others
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct ColumnFamily { pub struct ColumnFamily<'a> {
inner: *mut ffi::rocksdb_column_family_handle_t, inner: *mut ffi::rocksdb_column_family_handle_t,
db: PhantomData<&'a DB>,
} }
unsafe impl Send for ColumnFamily {} unsafe impl<'a> Send for ColumnFamily<'a> {}

@ -371,7 +371,7 @@ mod test {
let _ = db.get(b"k2"); let _ = db.get(b"k2");
} }
} }
db.compact_range(None, None); db.compact_range(None::<&[u8]>, None::<&[u8]>);
let d1 = db.clone(); let d1 = db.clone();
let d2 = db.clone(); let d2 = db.clone();
let d3 = db.clone(); let d3 = db.clone();
@ -402,7 +402,7 @@ mod test {
let _ = d2.get(b"k2"); let _ = d2.get(b"k2");
} }
} }
d2.compact_range(None, None); d2.compact_range(None::<&[u8]>, None::<&[u8]>);
}); });
h2.join().unwrap(); h2.join().unwrap();
let h3 = thread::spawn(move || { let h3 = thread::spawn(move || {

@ -13,18 +13,17 @@
// limitations under the License. // limitations under the License.
// //
extern crate rocksdb; extern crate rocksdb;
mod util;
use rocksdb::{checkpoint::Checkpoint, Options, DB}; use rocksdb::{checkpoint::Checkpoint, Options, DB};
use std::fs::remove_dir_all; use util::DBPath;
#[test] #[test]
pub fn test_single_checkpoint() { pub fn test_single_checkpoint() {
const PATH_PREFIX: &str = "_rust_rocksdb_cp_single_"; const PATH_PREFIX: &str = "_rust_rocksdb_cp_single_";
// Create DB with some data // Create DB with some data
let db_path = format!("{}db1", PATH_PREFIX); let db_path = DBPath::new(&format!("{}db1", PATH_PREFIX));
let _ = remove_dir_all(&db_path);
let mut opts = Options::default(); let mut opts = Options::default();
opts.create_if_missing(true); opts.create_if_missing(true);
@ -37,8 +36,7 @@ pub fn test_single_checkpoint() {
// Create checkpoint // Create checkpoint
let cp1 = Checkpoint::new(&db).unwrap(); let cp1 = Checkpoint::new(&db).unwrap();
let cp1_path = format!("{}cp1", PATH_PREFIX); let cp1_path = DBPath::new(&format!("{}cp1", PATH_PREFIX));
let _ = remove_dir_all(&cp1_path);
cp1.create_checkpoint(&cp1_path).unwrap(); cp1.create_checkpoint(&cp1_path).unwrap();
// Verify checkpoint // Verify checkpoint
@ -48,9 +46,6 @@ pub fn test_single_checkpoint() {
assert_eq!(*cp.get(b"k2").unwrap().unwrap(), *b"v2"); assert_eq!(*cp.get(b"k2").unwrap().unwrap(), *b"v2");
assert_eq!(*cp.get(b"k3").unwrap().unwrap(), *b"v3"); assert_eq!(*cp.get(b"k3").unwrap().unwrap(), *b"v3");
assert_eq!(*cp.get(b"k4").unwrap().unwrap(), *b"v4"); assert_eq!(*cp.get(b"k4").unwrap().unwrap(), *b"v4");
let _ = remove_dir_all(&db_path);
let _ = remove_dir_all(&cp1_path);
} }
#[test] #[test]
@ -58,8 +53,7 @@ pub fn test_multi_checkpoints() {
const PATH_PREFIX: &str = "_rust_rocksdb_cp_multi_"; const PATH_PREFIX: &str = "_rust_rocksdb_cp_multi_";
// Create DB with some data // Create DB with some data
let db_path = format!("{}db1", PATH_PREFIX); let db_path = DBPath::new(&format!("{}db1", PATH_PREFIX));
let _ = remove_dir_all(&db_path);
let mut opts = Options::default(); let mut opts = Options::default();
opts.create_if_missing(true); opts.create_if_missing(true);
@ -72,8 +66,7 @@ pub fn test_multi_checkpoints() {
// Create first checkpoint // Create first checkpoint
let cp1 = Checkpoint::new(&db).unwrap(); let cp1 = Checkpoint::new(&db).unwrap();
let cp1_path = format!("{}cp1", PATH_PREFIX); let cp1_path = DBPath::new(&format!("{}cp1", PATH_PREFIX));
let _ = remove_dir_all(&cp1_path);
cp1.create_checkpoint(&cp1_path).unwrap(); cp1.create_checkpoint(&cp1_path).unwrap();
// Verify checkpoint // Verify checkpoint
@ -84,8 +77,6 @@ pub fn test_multi_checkpoints() {
assert_eq!(*cp.get(b"k3").unwrap().unwrap(), *b"v3"); assert_eq!(*cp.get(b"k3").unwrap().unwrap(), *b"v3");
assert_eq!(*cp.get(b"k4").unwrap().unwrap(), *b"v4"); assert_eq!(*cp.get(b"k4").unwrap().unwrap(), *b"v4");
let _ = remove_dir_all(&cp1_path);
// Change some existing keys // Change some existing keys
db.put(b"k1", b"modified").unwrap(); db.put(b"k1", b"modified").unwrap();
db.put(b"k2", b"changed").unwrap(); db.put(b"k2", b"changed").unwrap();
@ -96,8 +87,7 @@ pub fn test_multi_checkpoints() {
// Create another checkpoint // Create another checkpoint
let cp2 = Checkpoint::new(&db).unwrap(); let cp2 = Checkpoint::new(&db).unwrap();
let cp2_path = format!("{}cp2", PATH_PREFIX); let cp2_path = DBPath::new(&format!("{}cp2", PATH_PREFIX));
let _ = remove_dir_all(&cp2_path);
cp2.create_checkpoint(&cp2_path).unwrap(); cp2.create_checkpoint(&cp2_path).unwrap();
// Verify second checkpoint // Verify second checkpoint
@ -107,7 +97,4 @@ pub fn test_multi_checkpoints() {
assert_eq!(*cp.get(b"k2").unwrap().unwrap(), *b"changed"); assert_eq!(*cp.get(b"k2").unwrap().unwrap(), *b"changed");
assert_eq!(*cp.get(b"k5").unwrap().unwrap(), *b"v5"); assert_eq!(*cp.get(b"k5").unwrap().unwrap(), *b"v5");
assert_eq!(*cp.get(b"k6").unwrap().unwrap(), *b"v6"); assert_eq!(*cp.get(b"k6").unwrap().unwrap(), *b"v6");
let _ = remove_dir_all(&db_path);
let _ = remove_dir_all(&cp2_path);
} }

@ -185,7 +185,7 @@ pub fn test_column_family_with_options() {
let cfs = vec![cf_descriptor]; let cfs = vec![cf_descriptor];
match DB::open_cf_descriptors(&opts, &n, cfs) { match DB::open_cf_descriptors(&opts, &n, cfs) {
Ok(_db) => println!("created db with column family descriptors succesfully"), Ok(_db) => println!("created db with column family descriptors successfully"),
Err(e) => { Err(e) => {
panic!( panic!(
"could not create new database with column family descriptors: {}", "could not create new database with column family descriptors: {}",
@ -204,7 +204,7 @@ pub fn test_column_family_with_options() {
let cfs = vec![cf_descriptor]; let cfs = vec![cf_descriptor];
match DB::open_cf_descriptors(&opts, &n, cfs) { match DB::open_cf_descriptors(&opts, &n, cfs) {
Ok(_db) => println!("succesfully re-opened database with column family descriptors"), Ok(_db) => println!("successfully re-opened database with column family descriptors"),
Err(e) => { Err(e) => {
panic!( panic!(
"unable to re-open database with column family descriptors: {}", "unable to re-open database with column family descriptors: {}",

@ -0,0 +1,51 @@
// Copyright 2019 Tyler Neely
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
extern crate rocksdb;
mod util;
use rocksdb::{CompactionDecision, DB, Options};
use util::DBPath;
#[cfg(test)]
#[allow(unused_variables)]
fn test_filter(level: u32, key: &[u8], value: &[u8]) -> CompactionDecision {
use self::CompactionDecision::*;
match key.first() {
Some(&b'_') => Remove,
Some(&b'%') => Change(b"secret"),
_ => Keep,
}
}
#[test]
fn compaction_filter_test() {
use {Options, DB};
let path = DBPath::new("_rust_rocksdb_filtertest");
let mut opts = Options::default();
opts.create_if_missing(true);
opts.set_compaction_filter("test", test_filter);
{
let db = DB::open(&opts, &path).unwrap();
let _ = db.put(b"k1", b"a");
let _ = db.put(b"_k", b"b");
let _ = db.put(b"%k", b"c");
db.compact_range(None::<&[u8]>, None::<&[u8]>);
assert_eq!(&*db.get(b"k1").unwrap().unwrap(), b"a");
assert!(db.get(b"_k").unwrap().is_none());
assert_eq!(&*db.get(b"%k").unwrap().unwrap(), b"secret");
}
}

@ -0,0 +1,181 @@
// Copyright 2019 Tyler Neely
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
extern crate rocksdb;
extern crate libc;
mod util;
use libc::{size_t};
use rocksdb::{DB, DBVector, Error, IteratorMode, Options, WriteBatch};
use util::DBPath;
#[test]
fn test_db_vector() {
use std::mem;
let len: size_t = 4;
let data: *mut u8 = unsafe { mem::transmute(libc::calloc(len, mem::size_of::<u8>())) };
let v = unsafe { DBVector::from_c(data, len) };
let ctrl = [0u8, 0, 0, 0];
assert_eq!(&*v, &ctrl[..]);
}
#[test]
fn external() {
let path = DBPath::new("_rust_rocksdb_externaltest");
{
let db = DB::open_default(&path).unwrap();
assert!(db.put(b"k1", b"v1111").is_ok());
let r: Result<Option<DBVector>, Error> = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111");
assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none());
}
}
#[test]
fn errors_do_stuff() {
let path = DBPath::new("_rust_rocksdb_error");
let _db = DB::open_default(&path).unwrap();
let opts = Options::default();
// The DB will still be open when we try to destroy it and the lock should fail.
match DB::destroy(&opts, &path) {
Err(s) => {
let message = s.to_string();
assert!(message.find("IO error:").is_some());
assert!(message.find("_rust_rocksdb_error").is_some());
assert!(message.find("/LOCK:").is_some());
}
Ok(_) => panic!("should fail"),
}
}
#[test]
fn writebatch_works() {
let path = DBPath::new("_rust_rocksdb_writebacktest");
{
let db = DB::open_default(&path).unwrap();
{
// test put
let mut batch = WriteBatch::default();
assert!(db.get(b"k1").unwrap().is_none());
assert_eq!(batch.len(), 0);
assert!(batch.is_empty());
let _ = batch.put(b"k1", b"v1111");
assert_eq!(batch.len(), 1);
assert!(!batch.is_empty());
assert!(db.get(b"k1").unwrap().is_none());
assert!(db.write(batch).is_ok());
let r: Result<Option<DBVector>, Error> = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111");
}
{
// test delete
let mut batch = WriteBatch::default();
let _ = batch.delete(b"k1");
assert_eq!(batch.len(), 1);
assert!(!batch.is_empty());
assert!(db.write(batch).is_ok());
assert!(db.get(b"k1").unwrap().is_none());
}
{
// test size_in_bytes
let mut batch = WriteBatch::default();
let before = batch.size_in_bytes();
let _ = batch.put(b"k1", b"v1234567890");
let after = batch.size_in_bytes();
assert!(before + 10 <= after);
}
}
}
#[test]
fn iterator_test() {
let path = DBPath::new("_rust_rocksdb_iteratortest");
{
let data = [(b"k1", b"v1111"), (b"k2", b"v2222"), (b"k3", b"v3333")];
let db = DB::open_default(&path).unwrap();
for (key, value) in &data {
assert!(db.put(key, value).is_ok());
}
let iter = db.iterator(IteratorMode::Start);
for (idx, (db_key, db_value)) in iter.enumerate() {
let (key, value) = data[idx];
assert_eq!((&key[..], &value[..]), (db_key.as_ref(), db_value.as_ref()));
}
}
}
#[test]
fn snapshot_test() {
let path = DBPath::new("_rust_rocksdb_snapshottest");
{
let db = DB::open_default(&path).unwrap();
assert!(db.put(b"k1", b"v1111").is_ok());
let snap = db.snapshot();
assert!(snap.get(b"k1").unwrap().unwrap().to_utf8().unwrap() == "v1111");
assert!(db.put(b"k2", b"v2222").is_ok());
assert!(db.get(b"k2").unwrap().is_some());
assert!(snap.get(b"k2").unwrap().is_none());
}
}
#[test]
fn set_option_test() {
let path = DBPath::new("_rust_rocksdb_set_optionstest");
{
let db = DB::open_default(&path).unwrap();
// set an option to valid values
assert!(db
.set_options(&[("disable_auto_compactions", "true")])
.is_ok());
assert!(db
.set_options(&[("disable_auto_compactions", "false")])
.is_ok());
// invalid names/values should result in an error
assert!(db
.set_options(&[("disable_auto_compactions", "INVALID_VALUE")])
.is_err());
assert!(db
.set_options(&[("INVALID_NAME", "INVALID_VALUE")])
.is_err());
// option names/values must not contain NULLs
assert!(db
.set_options(&[("disable_auto_compactions", "true\0")])
.is_err());
assert!(db
.set_options(&[("disable_auto_compactions\0", "true")])
.is_err());
// empty options are not allowed
assert!(db.set_options(&[]).is_err());
// multiple options can be set in a single API call
let multiple_options = [
("paranoid_file_checks", "true"),
("report_bg_io_stats", "true"),
];
db.set_options(&multiple_options).unwrap();
}
}
Loading…
Cancel
Save