Always use pinnable slice for get operations (#345)

master
Gary Guo 5 years ago committed by Oleksandr Anyshchenko
parent 9b8486aa44
commit 473b1671c7
  1. 186
      src/db.rs
  2. 6
      src/lib.rs
  3. 10
      src/merge_operator.rs
  4. 4
      tests/test_backup.rs
  5. 10
      tests/test_column_family.rs
  6. 34
      tests/test_db.rs

@ -690,7 +690,7 @@ impl<'a> Snapshot<'a> {
DBRawIterator::new_cf(self.db, cf_handle, &readopts) DBRawIterator::new_cf(self.db, cf_handle, &readopts)
} }
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> { pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
let readopts = ReadOptions::default(); let readopts = ReadOptions::default();
self.get_opt(key, readopts) self.get_opt(key, readopts)
} }
@ -699,7 +699,7 @@ impl<'a> Snapshot<'a> {
&self, &self,
cf: &ColumnFamily, cf: &ColumnFamily,
key: K, key: K,
) -> Result<Option<DBVector>, Error> { ) -> Result<Option<Vec<u8>>, Error> {
let readopts = ReadOptions::default(); let readopts = ReadOptions::default();
self.get_cf_opt(cf, key.as_ref(), readopts) self.get_cf_opt(cf, key.as_ref(), readopts)
} }
@ -708,7 +708,7 @@ impl<'a> Snapshot<'a> {
&self, &self,
key: K, key: K,
mut readopts: ReadOptions, mut readopts: ReadOptions,
) -> Result<Option<DBVector>, Error> { ) -> Result<Option<Vec<u8>>, Error> {
readopts.set_snapshot(self); readopts.set_snapshot(self);
self.db.get_opt(key.as_ref(), &readopts) self.db.get_opt(key.as_ref(), &readopts)
} }
@ -718,7 +718,7 @@ impl<'a> Snapshot<'a> {
cf: &ColumnFamily, cf: &ColumnFamily,
key: K, key: K,
mut readopts: ReadOptions, mut readopts: ReadOptions,
) -> Result<Option<DBVector>, Error> { ) -> Result<Option<Vec<u8>>, Error> {
readopts.set_snapshot(self); readopts.set_snapshot(self);
self.db.get_cf_opt(cf, key.as_ref(), &readopts) self.db.get_cf_opt(cf, key.as_ref(), &readopts)
} }
@ -941,88 +941,46 @@ impl DB {
self.write_opt(batch, &wo) self.write_opt(batch, &wo)
} }
/// Return the bytes associated with a key value with read options. If you only intend to use
/// the vector returned temporarily, consider using [`get_pinned_opt`](#method.get_pinned_opt)
/// to avoid unnecessary memory copy.
pub fn get_opt<K: AsRef<[u8]>>( pub fn get_opt<K: AsRef<[u8]>>(
&self, &self,
key: K, key: K,
readopts: &ReadOptions, readopts: &ReadOptions,
) -> Result<Option<DBVector>, Error> { ) -> Result<Option<Vec<u8>>, Error> {
if readopts.inner.is_null() { self.get_pinned_opt(key, readopts)
return Err(Error::new( .map(|x| x.map(|v| v.as_ref().to_vec()))
"Unable to create RocksDB read options. \
This is a fairly trivial call, and its \
failure may be indicative of a \
mis-compiled or mis-loaded RocksDB \
library."
.to_owned(),
));
}
let key = key.as_ref();
unsafe {
let mut val_len: size_t = 0;
let val = ffi_try!(ffi::rocksdb_get(
self.inner,
readopts.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
&mut val_len,
)) as *mut u8;
if val.is_null() {
Ok(None)
} else {
Ok(Some(DBVector::from_c(val, val_len)))
}
}
} }
/// Return the bytes associated with a key value /// Return the bytes associated with a key value. If you only intend to use the vector returned
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<DBVector>, Error> { /// temporarily, consider using [`get_pinned`](#method.get_pinned) to avoid unnecessary memory
/// copy.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<Vec<u8>>, Error> {
self.get_opt(key.as_ref(), &ReadOptions::default()) self.get_opt(key.as_ref(), &ReadOptions::default())
} }
/// Return the bytes associated with a key value and the given column family with read options.
/// If you only intend to use the vector returned temporarily, consider using
/// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory.
pub fn get_cf_opt<K: AsRef<[u8]>>( pub fn get_cf_opt<K: AsRef<[u8]>>(
&self, &self,
cf: &ColumnFamily, cf: &ColumnFamily,
key: K, key: K,
readopts: &ReadOptions, readopts: &ReadOptions,
) -> Result<Option<DBVector>, Error> { ) -> Result<Option<Vec<u8>>, Error> {
if readopts.inner.is_null() { self.get_pinned_cf_opt(cf, key, readopts)
return Err(Error::new( .map(|x| x.map(|v| v.as_ref().to_vec()))
"Unable to create RocksDB read options. \
This is a fairly trivial call, and its \
failure may be indicative of a \
mis-compiled or mis-loaded RocksDB \
library."
.to_owned(),
));
}
let key = key.as_ref();
unsafe {
let mut val_len: size_t = 0;
let val = ffi_try!(ffi::rocksdb_get_cf(
self.inner,
readopts.inner,
cf.inner,
key.as_ptr() as *const c_char,
key.len() as size_t,
&mut val_len,
)) as *mut u8;
if val.is_null() {
Ok(None)
} else {
Ok(Some(DBVector::from_c(val, val_len)))
}
}
} }
/// Return the bytes associated with a key value and the given column family. If you only
/// intend to use the vector returned temporarily, consider using
/// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory.
pub fn get_cf<K: AsRef<[u8]>>( pub fn get_cf<K: AsRef<[u8]>>(
&self, &self,
cf: &ColumnFamily, cf: &ColumnFamily,
key: K, key: K,
) -> Result<Option<DBVector>, Error> { ) -> Result<Option<Vec<u8>>, Error> {
self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default()) self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default())
} }
@ -2020,69 +1978,6 @@ unsafe impl Send for ReadOptions {}
unsafe impl<'a> Sync for DBRawIterator<'a> {} unsafe impl<'a> Sync for DBRawIterator<'a> {}
unsafe impl Sync for ReadOptions {} unsafe impl Sync for ReadOptions {}
/// Vector of bytes stored in the database.
///
/// This is a `C` allocated byte array and a length value.
/// Normal usage would be to utilize the fact it implements `Deref<[u8]>` and use it as
/// a slice.
pub struct DBVector {
base: *mut u8,
len: usize,
}
impl Deref for DBVector {
type Target = [u8];
fn deref(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.base, self.len) }
}
}
impl AsRef<[u8]> for DBVector {
fn as_ref(&self) -> &[u8] {
// Implement this via Deref so as not to repeat ourselves
&*self
}
}
impl Drop for DBVector {
fn drop(&mut self) {
unsafe {
libc::free(self.base as *mut c_void);
}
}
}
impl DBVector {
/// Used internally to create a DBVector from a `C` memory block
///
/// # Unsafe
/// Requires that the ponter be allocated by a `malloc` derivative (all C libraries), and
/// `val_len` be the length of the C array to be safe (since `sizeof(u8) = 1`).
///
/// # Example
///
/// ```ignore
/// let buf_len: libc::size_t = unsafe { mem::uninitialized() };
/// // Assume the function fills buf_len with the length of the returned array
/// let buf: *mut u8 = unsafe { ffi_function_returning_byte_array(&buf_len) };
/// DBVector::from_c(buf, buf_len)
/// ```
pub unsafe fn from_c(val: *mut u8, val_len: size_t) -> DBVector {
DBVector {
base: val,
len: val_len as usize,
}
}
/// Convenience function to attempt to reinterperet value as string.
///
/// implemented as `str::from_utf8(&self[..])`
pub fn to_utf8(&self) -> Option<&str> {
str::from_utf8(self.deref()).ok()
}
}
fn to_cpath<P: AsRef<Path>>(path: P) -> Result<CString, Error> { fn to_cpath<P: AsRef<Path>>(path: P) -> Result<CString, Error> {
match CString::new(path.as_ref().to_string_lossy().as_bytes()) { match CString::new(path.as_ref().to_string_lossy().as_bytes()) {
Ok(c) => Ok(c), Ok(c) => Ok(c),
@ -2102,6 +1997,15 @@ pub struct DBPinnableSlice<'a> {
db: PhantomData<&'a DB>, db: PhantomData<&'a DB>,
} }
// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI
// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and
// rocksdb internally does not rely on thread-local information for its user-exposed types.
unsafe impl<'a> Send for DBPinnableSlice<'a> {}
// Sync is similarly safe for many types because they do not expose interior mutability, and their
// use within the rocksdb library is generally behind a const reference
unsafe impl<'a> Sync for DBPinnableSlice<'a> {}
impl<'a> AsRef<[u8]> for DBPinnableSlice<'a> { impl<'a> AsRef<[u8]> for DBPinnableSlice<'a> {
fn as_ref(&self) -> &[u8] { fn as_ref(&self) -> &[u8] {
// Implement this via Deref so as not to repeat ourselves // Implement this via Deref so as not to repeat ourselves
@ -2134,7 +2038,7 @@ impl<'a> DBPinnableSlice<'a> {
/// ///
/// # Unsafe /// # Unsafe
/// Requires that the pointer must be generated by rocksdb_get_pinned /// Requires that the pointer must be generated by rocksdb_get_pinned
pub unsafe fn from_c(ptr: *mut ffi::rocksdb_pinnableslice_t) -> DBPinnableSlice<'a> { unsafe fn from_c(ptr: *mut ffi::rocksdb_pinnableslice_t) -> DBPinnableSlice<'a> {
DBPinnableSlice { DBPinnableSlice {
ptr, ptr,
db: PhantomData, db: PhantomData,
@ -2142,16 +2046,6 @@ impl<'a> DBPinnableSlice<'a> {
} }
} }
#[test]
fn test_db_vector() {
use std::mem;
let len: size_t = 4;
let data = unsafe { libc::calloc(len, mem::size_of::<u8>()) as *mut u8 };
let v = unsafe { DBVector::from_c(data, len) };
let ctrl = [0u8, 0, 0, 0];
assert_eq!(&*v, &ctrl[..]);
}
#[test] #[test]
fn external() { fn external() {
let path = "_rust_rocksdb_externaltest"; let path = "_rust_rocksdb_externaltest";
@ -2159,8 +2053,8 @@ fn external() {
let db = DB::open_default(path).unwrap(); let db = DB::open_default(path).unwrap();
let p = db.put(b"k1", b"v1111"); let p = db.put(b"k1", b"v1111");
assert!(p.is_ok()); assert!(p.is_ok());
let r: Result<Option<DBVector>, Error> = db.get(b"k1"); let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111"); assert_eq!(r.unwrap().unwrap(), b"v1111");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
} }
@ -2209,8 +2103,8 @@ fn writebatch_works() {
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
let p = db.write(batch); let p = db.write(batch);
assert!(p.is_ok()); assert!(p.is_ok());
let r: Result<Option<DBVector>, Error> = db.get(b"k1"); let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111"); assert_eq!(r.unwrap().unwrap(), b"v1111");
} }
{ {
// test delete // test delete
@ -2313,8 +2207,8 @@ fn snapshot_test() {
assert!(p.is_ok()); assert!(p.is_ok());
let snap = db.snapshot(); let snap = db.snapshot();
let r: Result<Option<DBVector>, Error> = snap.get(b"k1"); let r: Result<Option<Vec<u8>>, Error> = snap.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111"); assert_eq!(r.unwrap().unwrap(), b"v1111");
let p = db.put(b"k2", b"v2222"); let p = db.put(b"k2", b"v2222");
assert!(p.is_ok()); assert!(p.is_ok());

@ -25,7 +25,7 @@
//! let db = DB::open_default(path).unwrap(); //! let db = DB::open_default(path).unwrap();
//! db.put(b"my key", b"my value").unwrap(); //! db.put(b"my key", b"my value").unwrap();
//! match db.get(b"my key") { //! match db.get(b"my key") {
//! Ok(Some(value)) => println!("retrieved value {}", value.to_utf8().unwrap()), //! Ok(Some(value)) => println!("retrieved value {}", String::from_utf8(value).unwrap()),
//! Ok(None) => println!("value not found"), //! Ok(None) => println!("value not found"),
//! Err(e) => println!("operational problem encountered: {}", e), //! Err(e) => println!("operational problem encountered: {}", e),
//! } //! }
@ -72,8 +72,8 @@ mod slice_transform;
pub use compaction_filter::Decision as CompactionDecision; pub use compaction_filter::Decision as CompactionDecision;
pub use db::{ pub use db::{
DBCompactionStyle, DBCompressionType, DBIterator, DBPinnableSlice, DBRawIterator, DBCompactionStyle, DBCompressionType, DBIterator, DBPinnableSlice, DBRawIterator,
DBRecoveryMode, DBVector, DBWALIterator, Direction, IteratorMode, ReadOptions, Snapshot, DBRecoveryMode, DBWALIterator, Direction, IteratorMode, ReadOptions, Snapshot, WriteBatch,
WriteBatch, WriteBatchIterator, WriteBatchIterator,
}; };
pub use slice_transform::SliceTransform; pub use slice_transform::SliceTransform;

@ -50,7 +50,7 @@
//! db.merge(b"k1", b"d"); //! db.merge(b"k1", b"d");
//! db.merge(b"k1", b"efg"); //! db.merge(b"k1", b"efg");
//! let r = db.get(b"k1"); //! let r = db.get(b"k1");
//! assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefg"); //! assert_eq!(r.unwrap().unwrap(), b"abcdefg");
//! } //! }
//! let _ = DB::destroy(&opts, path); //! let _ = DB::destroy(&opts, path);
//! } //! }
@ -242,9 +242,9 @@ mod test {
let m = db.merge(b"k1", b"h"); let m = db.merge(b"k1", b"h");
assert!(m.is_ok()); assert!(m.is_ok());
match db.get(b"k1") { match db.get(b"k1") {
Ok(Some(value)) => match value.to_utf8() { Ok(Some(value)) => match std::str::from_utf8(&value) {
Some(v) => println!("retrieved utf8 value: {}", v), Ok(v) => println!("retrieved utf8 value: {}", v),
None => println!("did not read valid utf-8 out of the db"), Err(_) => println!("did not read valid utf-8 out of the db"),
}, },
Err(_) => println!("error reading value"), Err(_) => println!("error reading value"),
_ => panic!("value not present"), _ => panic!("value not present"),
@ -252,7 +252,7 @@ mod test {
assert!(m.is_ok()); assert!(m.is_ok());
let r = db.get(b"k1"); let r = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefgh"); assert_eq!(r.unwrap().unwrap(), b"abcdefgh");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
} }

@ -30,7 +30,7 @@ fn backup_restore() {
let db = DB::open(&opts, path).unwrap(); let db = DB::open(&opts, path).unwrap();
assert!(db.put(b"k1", b"v1111").is_ok()); assert!(db.put(b"k1", b"v1111").is_ok());
let value = db.get(b"k1"); let value = db.get(b"k1");
assert_eq!(value.unwrap().unwrap().as_ref(), b"v1111"); assert_eq!(value.unwrap().unwrap(), b"v1111");
{ {
let backup_path = "_rust_rocksdb_backup_path"; let backup_path = "_rust_rocksdb_backup_path";
let backup_opts = BackupEngineOptions::default(); let backup_opts = BackupEngineOptions::default();
@ -48,7 +48,7 @@ fn backup_restore() {
let db_restore = DB::open_default(restore_path).unwrap(); let db_restore = DB::open_default(restore_path).unwrap();
let value = db_restore.get(b"k1"); let value = db_restore.get(b"k1");
assert_eq!(value.unwrap().unwrap().as_ref(), b"v1111"); assert_eq!(value.unwrap().unwrap(), b"v1111");
} }
} }
assert!(DB::destroy(&opts, restore_path).is_ok()); assert!(DB::destroy(&opts, restore_path).is_ok());

@ -146,7 +146,7 @@ fn test_merge_operator() {
}; };
let cf1 = db.cf_handle("cf1").unwrap(); let cf1 = db.cf_handle("cf1").unwrap();
assert!(db.put_cf(cf1, b"k1", b"v1").is_ok()); assert!(db.put_cf(cf1, b"k1", b"v1").is_ok());
assert!(db.get_cf(cf1, b"k1").unwrap().unwrap().to_utf8().unwrap() == "v1"); assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1");
let p = db.put_cf(cf1, b"k1", b"a"); let p = db.put_cf(cf1, b"k1", b"a");
assert!(p.is_ok()); assert!(p.is_ok());
db.merge_cf(cf1, b"k1", b"b").unwrap(); db.merge_cf(cf1, b"k1", b"b").unwrap();
@ -157,16 +157,16 @@ fn test_merge_operator() {
println!("m is {:?}", m); println!("m is {:?}", m);
// TODO assert!(m.is_ok()); // TODO assert!(m.is_ok());
match db.get(b"k1") { match db.get(b"k1") {
Ok(Some(value)) => match value.to_utf8() { Ok(Some(value)) => match std::str::from_utf8(&value) {
Some(v) => println!("retrieved utf8 value: {}", v), Ok(v) => println!("retrieved utf8 value: {}", v),
None => println!("did not read valid utf-8 out of the db"), Err(_) => println!("did not read valid utf-8 out of the db"),
}, },
Err(_) => println!("error reading value"), Err(_) => println!("error reading value"),
_ => panic!("value not present!"), _ => panic!("value not present!"),
} }
let _ = db.get_cf(cf1, b"k1"); let _ = db.get_cf(cf1, b"k1");
// TODO assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); // TODO assert!(r.unwrap().as_ref() == b"abcdefgh");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
} }

@ -17,23 +17,11 @@ extern crate rocksdb;
mod util; mod util;
use libc::size_t; use rocksdb::{Error, IteratorMode, Options, Snapshot, WriteBatch, DB};
use rocksdb::{DBVector, Error, IteratorMode, Options, Snapshot, WriteBatch, DB};
use std::sync::Arc; use std::sync::Arc;
use std::{mem, thread}; use std::{mem, thread};
use util::DBPath; use util::DBPath;
#[test]
fn test_db_vector() {
use std::mem;
let len: size_t = 4;
let data: *mut u8 = unsafe { libc::calloc(len, mem::size_of::<u8>()) as *mut u8 };
let v = unsafe { DBVector::from_c(data, len) };
let ctrl = [0u8, 0, 0, 0];
assert_eq!(&*v, &ctrl[..]);
}
#[test] #[test]
fn external() { fn external() {
let path = DBPath::new("_rust_rocksdb_externaltest"); let path = DBPath::new("_rust_rocksdb_externaltest");
@ -43,9 +31,9 @@ fn external() {
assert!(db.put(b"k1", b"v1111").is_ok()); assert!(db.put(b"k1", b"v1111").is_ok());
let r: Result<Option<DBVector>, Error> = db.get(b"k1"); let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111"); assert_eq!(r.unwrap().unwrap(), b"v1111");
assert!(db.delete(b"k1").is_ok()); assert!(db.delete(b"k1").is_ok());
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
} }
@ -60,7 +48,7 @@ fn db_vector_as_ref_byte_slice() {
assert!(db.put(b"k1", b"v1111").is_ok()); assert!(db.put(b"k1", b"v1111").is_ok());
let r: Result<Option<DBVector>, Error> = db.get(b"k1"); let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
let vector = r.unwrap().unwrap(); let vector = r.unwrap().unwrap();
assert!(get_byte_slice(&vector) == b"v1111"); assert!(get_byte_slice(&vector) == b"v1111");
@ -104,8 +92,8 @@ fn writebatch_works() {
assert!(!batch.is_empty()); assert!(!batch.is_empty());
assert!(db.get(b"k1").unwrap().is_none()); assert!(db.get(b"k1").unwrap().is_none());
assert!(db.write(batch).is_ok()); assert!(db.write(batch).is_ok());
let r: Result<Option<DBVector>, Error> = db.get(b"k1"); let r: Result<Option<Vec<u8>>, Error> = db.get(b"k1");
assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111"); assert_eq!(r.unwrap().unwrap(), b"v1111");
} }
{ {
// test delete // test delete
@ -156,7 +144,7 @@ fn snapshot_test() {
assert!(db.put(b"k1", b"v1111").is_ok()); assert!(db.put(b"k1", b"v1111").is_ok());
let snap = db.snapshot(); let snap = db.snapshot();
assert!(snap.get(b"k1").unwrap().unwrap().to_utf8().unwrap() == "v1111"); assert_eq!(snap.get(b"k1").unwrap().unwrap(), b"v1111");
assert!(db.put(b"k2", b"v2222").is_ok()); assert!(db.put(b"k2", b"v2222").is_ok());
@ -177,11 +165,11 @@ impl SnapshotWrapper {
} }
} }
fn check<K>(&self, key: K, value: &str) -> bool fn check<K>(&self, key: K, value: &[u8]) -> bool
where where
K: AsRef<[u8]>, K: AsRef<[u8]>,
{ {
self.snapshot.get(key).unwrap().unwrap().to_utf8().unwrap() == value self.snapshot.get(key).unwrap().unwrap() == value
} }
} }
@ -195,10 +183,10 @@ fn sync_snapshot_test() {
let wrapper = SnapshotWrapper::new(&db); let wrapper = SnapshotWrapper::new(&db);
let wrapper_1 = wrapper.clone(); let wrapper_1 = wrapper.clone();
let handler_1 = thread::spawn(move || wrapper_1.check("k1", "v1")); let handler_1 = thread::spawn(move || wrapper_1.check("k1", b"v1"));
let wrapper_2 = wrapper.clone(); let wrapper_2 = wrapper.clone();
let handler_2 = thread::spawn(move || wrapper_2.check("k2", "v2")); let handler_2 = thread::spawn(move || wrapper_2.check("k2", b"v2"));
assert!(handler_1.join().unwrap()); assert!(handler_1.join().unwrap());
assert!(handler_2.join().unwrap()); assert!(handler_2.join().unwrap());

Loading…
Cancel
Save