From 473b1671c75153502e980808d66cdeb3da7b7f73 Mon Sep 17 00:00:00 2001 From: Gary Guo Date: Wed, 6 Nov 2019 07:21:15 +0000 Subject: [PATCH] Always use pinnable slice for get operations (#345) --- src/db.rs | 186 ++++++++---------------------------- src/lib.rs | 6 +- src/merge_operator.rs | 10 +- tests/test_backup.rs | 4 +- tests/test_column_family.rs | 10 +- tests/test_db.rs | 34 +++---- 6 files changed, 66 insertions(+), 184 deletions(-) diff --git a/src/db.rs b/src/db.rs index bc80d98..a6e2d5f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -690,7 +690,7 @@ impl<'a> Snapshot<'a> { DBRawIterator::new_cf(self.db, cf_handle, &readopts) } - pub fn get>(&self, key: K) -> Result, Error> { + pub fn get>(&self, key: K) -> Result>, Error> { let readopts = ReadOptions::default(); self.get_opt(key, readopts) } @@ -699,7 +699,7 @@ impl<'a> Snapshot<'a> { &self, cf: &ColumnFamily, key: K, - ) -> Result, Error> { + ) -> Result>, Error> { let readopts = ReadOptions::default(); self.get_cf_opt(cf, key.as_ref(), readopts) } @@ -708,7 +708,7 @@ impl<'a> Snapshot<'a> { &self, key: K, mut readopts: ReadOptions, - ) -> Result, Error> { + ) -> Result>, Error> { readopts.set_snapshot(self); self.db.get_opt(key.as_ref(), &readopts) } @@ -718,7 +718,7 @@ impl<'a> Snapshot<'a> { cf: &ColumnFamily, key: K, mut readopts: ReadOptions, - ) -> Result, Error> { + ) -> Result>, Error> { readopts.set_snapshot(self); self.db.get_cf_opt(cf, key.as_ref(), &readopts) } @@ -941,88 +941,46 @@ impl DB { self.write_opt(batch, &wo) } + /// Return the bytes associated with a key value with read options. If you only intend to use + /// the vector returned temporarily, consider using [`get_pinned_opt`](#method.get_pinned_opt) + /// to avoid unnecessary memory copy. pub fn get_opt>( &self, key: K, readopts: &ReadOptions, - ) -> Result, Error> { - if readopts.inner.is_null() { - return Err(Error::new( - "Unable to create RocksDB read options. \ - This is a fairly trivial call, and its \ - failure may be indicative of a \ - mis-compiled or mis-loaded RocksDB \ - library." - .to_owned(), - )); - } - - let key = key.as_ref(); - - unsafe { - let mut val_len: size_t = 0; - let val = ffi_try!(ffi::rocksdb_get( - self.inner, - readopts.inner, - key.as_ptr() as *const c_char, - key.len() as size_t, - &mut val_len, - )) as *mut u8; - if val.is_null() { - Ok(None) - } else { - Ok(Some(DBVector::from_c(val, val_len))) - } - } + ) -> Result>, Error> { + self.get_pinned_opt(key, readopts) + .map(|x| x.map(|v| v.as_ref().to_vec())) } - /// Return the bytes associated with a key value - pub fn get>(&self, key: K) -> Result, Error> { + /// Return the bytes associated with a key value. If you only intend to use the vector returned + /// temporarily, consider using [`get_pinned`](#method.get_pinned) to avoid unnecessary memory + /// copy. + pub fn get>(&self, key: K) -> Result>, Error> { self.get_opt(key.as_ref(), &ReadOptions::default()) } + /// Return the bytes associated with a key value and the given column family with read options. + /// If you only intend to use the vector returned temporarily, consider using + /// [`get_pinned_cf_opt`](#method.get_pinned_cf_opt) to avoid unnecessary memory. pub fn get_cf_opt>( &self, cf: &ColumnFamily, key: K, readopts: &ReadOptions, - ) -> Result, Error> { - if readopts.inner.is_null() { - return Err(Error::new( - "Unable to create RocksDB read options. \ - This is a fairly trivial call, and its \ - failure may be indicative of a \ - mis-compiled or mis-loaded RocksDB \ - library." - .to_owned(), - )); - } - - let key = key.as_ref(); - - unsafe { - let mut val_len: size_t = 0; - let val = ffi_try!(ffi::rocksdb_get_cf( - self.inner, - readopts.inner, - cf.inner, - key.as_ptr() as *const c_char, - key.len() as size_t, - &mut val_len, - )) as *mut u8; - if val.is_null() { - Ok(None) - } else { - Ok(Some(DBVector::from_c(val, val_len))) - } - } + ) -> Result>, Error> { + self.get_pinned_cf_opt(cf, key, readopts) + .map(|x| x.map(|v| v.as_ref().to_vec())) } + /// Return the bytes associated with a key value and the given column family. If you only + /// intend to use the vector returned temporarily, consider using + /// [`get_pinned_cf`](#method.get_pinned_cf) to avoid unnecessary memory. pub fn get_cf>( &self, cf: &ColumnFamily, key: K, - ) -> Result, Error> { + ) -> Result>, Error> { self.get_cf_opt(cf, key.as_ref(), &ReadOptions::default()) } @@ -2020,69 +1978,6 @@ unsafe impl Send for ReadOptions {} unsafe impl<'a> Sync for DBRawIterator<'a> {} unsafe impl Sync for ReadOptions {} -/// Vector of bytes stored in the database. -/// -/// This is a `C` allocated byte array and a length value. -/// Normal usage would be to utilize the fact it implements `Deref<[u8]>` and use it as -/// a slice. -pub struct DBVector { - base: *mut u8, - len: usize, -} - -impl Deref for DBVector { - type Target = [u8]; - - fn deref(&self) -> &[u8] { - unsafe { slice::from_raw_parts(self.base, self.len) } - } -} - -impl AsRef<[u8]> for DBVector { - fn as_ref(&self) -> &[u8] { - // Implement this via Deref so as not to repeat ourselves - &*self - } -} - -impl Drop for DBVector { - fn drop(&mut self) { - unsafe { - libc::free(self.base as *mut c_void); - } - } -} - -impl DBVector { - /// Used internally to create a DBVector from a `C` memory block - /// - /// # Unsafe - /// Requires that the ponter be allocated by a `malloc` derivative (all C libraries), and - /// `val_len` be the length of the C array to be safe (since `sizeof(u8) = 1`). - /// - /// # Example - /// - /// ```ignore - /// let buf_len: libc::size_t = unsafe { mem::uninitialized() }; - /// // Assume the function fills buf_len with the length of the returned array - /// let buf: *mut u8 = unsafe { ffi_function_returning_byte_array(&buf_len) }; - /// DBVector::from_c(buf, buf_len) - /// ``` - pub unsafe fn from_c(val: *mut u8, val_len: size_t) -> DBVector { - DBVector { - base: val, - len: val_len as usize, - } - } - - /// Convenience function to attempt to reinterperet value as string. - /// - /// implemented as `str::from_utf8(&self[..])` - pub fn to_utf8(&self) -> Option<&str> { - str::from_utf8(self.deref()).ok() - } -} - fn to_cpath>(path: P) -> Result { match CString::new(path.as_ref().to_string_lossy().as_bytes()) { Ok(c) => Ok(c), @@ -2102,6 +1997,15 @@ pub struct DBPinnableSlice<'a> { db: PhantomData<&'a DB>, } +// Safety note: auto-implementing Send on most db-related types is prevented by the inner FFI +// pointer. In most cases, however, this pointer is Send-safe because it is never aliased and +// rocksdb internally does not rely on thread-local information for its user-exposed types. +unsafe impl<'a> Send for DBPinnableSlice<'a> {} + +// Sync is similarly safe for many types because they do not expose interior mutability, and their +// use within the rocksdb library is generally behind a const reference +unsafe impl<'a> Sync for DBPinnableSlice<'a> {} + impl<'a> AsRef<[u8]> for DBPinnableSlice<'a> { fn as_ref(&self) -> &[u8] { // Implement this via Deref so as not to repeat ourselves @@ -2134,7 +2038,7 @@ impl<'a> DBPinnableSlice<'a> { /// /// # Unsafe /// Requires that the pointer must be generated by rocksdb_get_pinned - pub unsafe fn from_c(ptr: *mut ffi::rocksdb_pinnableslice_t) -> DBPinnableSlice<'a> { + unsafe fn from_c(ptr: *mut ffi::rocksdb_pinnableslice_t) -> DBPinnableSlice<'a> { DBPinnableSlice { ptr, db: PhantomData, @@ -2142,16 +2046,6 @@ impl<'a> DBPinnableSlice<'a> { } } -#[test] -fn test_db_vector() { - use std::mem; - let len: size_t = 4; - let data = unsafe { libc::calloc(len, mem::size_of::()) as *mut u8 }; - let v = unsafe { DBVector::from_c(data, len) }; - let ctrl = [0u8, 0, 0, 0]; - assert_eq!(&*v, &ctrl[..]); -} - #[test] fn external() { let path = "_rust_rocksdb_externaltest"; @@ -2159,8 +2053,8 @@ fn external() { let db = DB::open_default(path).unwrap(); let p = db.put(b"k1", b"v1111"); assert!(p.is_ok()); - let r: Result, Error> = db.get(b"k1"); - assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111"); + let r: Result>, Error> = db.get(b"k1"); + assert_eq!(r.unwrap().unwrap(), b"v1111"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").unwrap().is_none()); } @@ -2209,8 +2103,8 @@ fn writebatch_works() { assert!(db.get(b"k1").unwrap().is_none()); let p = db.write(batch); assert!(p.is_ok()); - let r: Result, Error> = db.get(b"k1"); - assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111"); + let r: Result>, Error> = db.get(b"k1"); + assert_eq!(r.unwrap().unwrap(), b"v1111"); } { // test delete @@ -2313,8 +2207,8 @@ fn snapshot_test() { assert!(p.is_ok()); let snap = db.snapshot(); - let r: Result, Error> = snap.get(b"k1"); - assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111"); + let r: Result>, Error> = snap.get(b"k1"); + assert_eq!(r.unwrap().unwrap(), b"v1111"); let p = db.put(b"k2", b"v2222"); assert!(p.is_ok()); diff --git a/src/lib.rs b/src/lib.rs index 7e235ab..ca8f25a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -25,7 +25,7 @@ //! let db = DB::open_default(path).unwrap(); //! db.put(b"my key", b"my value").unwrap(); //! match db.get(b"my key") { -//! Ok(Some(value)) => println!("retrieved value {}", value.to_utf8().unwrap()), +//! Ok(Some(value)) => println!("retrieved value {}", String::from_utf8(value).unwrap()), //! Ok(None) => println!("value not found"), //! Err(e) => println!("operational problem encountered: {}", e), //! } @@ -72,8 +72,8 @@ mod slice_transform; pub use compaction_filter::Decision as CompactionDecision; pub use db::{ DBCompactionStyle, DBCompressionType, DBIterator, DBPinnableSlice, DBRawIterator, - DBRecoveryMode, DBVector, DBWALIterator, Direction, IteratorMode, ReadOptions, Snapshot, - WriteBatch, WriteBatchIterator, + DBRecoveryMode, DBWALIterator, Direction, IteratorMode, ReadOptions, Snapshot, WriteBatch, + WriteBatchIterator, }; pub use slice_transform::SliceTransform; diff --git a/src/merge_operator.rs b/src/merge_operator.rs index 3610b96..9e5d7cf 100644 --- a/src/merge_operator.rs +++ b/src/merge_operator.rs @@ -50,7 +50,7 @@ //! db.merge(b"k1", b"d"); //! db.merge(b"k1", b"efg"); //! let r = db.get(b"k1"); -//! assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefg"); +//! assert_eq!(r.unwrap().unwrap(), b"abcdefg"); //! } //! let _ = DB::destroy(&opts, path); //! } @@ -242,9 +242,9 @@ mod test { let m = db.merge(b"k1", b"h"); assert!(m.is_ok()); match db.get(b"k1") { - Ok(Some(value)) => match value.to_utf8() { - Some(v) => println!("retrieved utf8 value: {}", v), - None => println!("did not read valid utf-8 out of the db"), + Ok(Some(value)) => match std::str::from_utf8(&value) { + Ok(v) => println!("retrieved utf8 value: {}", v), + Err(_) => println!("did not read valid utf-8 out of the db"), }, Err(_) => println!("error reading value"), _ => panic!("value not present"), @@ -252,7 +252,7 @@ mod test { assert!(m.is_ok()); let r = db.get(b"k1"); - assert!(r.unwrap().unwrap().to_utf8().unwrap() == "abcdefgh"); + assert_eq!(r.unwrap().unwrap(), b"abcdefgh"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").unwrap().is_none()); } diff --git a/tests/test_backup.rs b/tests/test_backup.rs index 4addfbe..e9af2c1 100644 --- a/tests/test_backup.rs +++ b/tests/test_backup.rs @@ -30,7 +30,7 @@ fn backup_restore() { let db = DB::open(&opts, path).unwrap(); assert!(db.put(b"k1", b"v1111").is_ok()); let value = db.get(b"k1"); - assert_eq!(value.unwrap().unwrap().as_ref(), b"v1111"); + assert_eq!(value.unwrap().unwrap(), b"v1111"); { let backup_path = "_rust_rocksdb_backup_path"; let backup_opts = BackupEngineOptions::default(); @@ -48,7 +48,7 @@ fn backup_restore() { let db_restore = DB::open_default(restore_path).unwrap(); let value = db_restore.get(b"k1"); - assert_eq!(value.unwrap().unwrap().as_ref(), b"v1111"); + assert_eq!(value.unwrap().unwrap(), b"v1111"); } } assert!(DB::destroy(&opts, restore_path).is_ok()); diff --git a/tests/test_column_family.rs b/tests/test_column_family.rs index 354aaf0..503ed9e 100644 --- a/tests/test_column_family.rs +++ b/tests/test_column_family.rs @@ -146,7 +146,7 @@ fn test_merge_operator() { }; let cf1 = db.cf_handle("cf1").unwrap(); assert!(db.put_cf(cf1, b"k1", b"v1").is_ok()); - assert!(db.get_cf(cf1, b"k1").unwrap().unwrap().to_utf8().unwrap() == "v1"); + assert_eq!(db.get_cf(cf1, b"k1").unwrap().unwrap(), b"v1"); let p = db.put_cf(cf1, b"k1", b"a"); assert!(p.is_ok()); db.merge_cf(cf1, b"k1", b"b").unwrap(); @@ -157,16 +157,16 @@ fn test_merge_operator() { println!("m is {:?}", m); // TODO assert!(m.is_ok()); match db.get(b"k1") { - Ok(Some(value)) => match value.to_utf8() { - Some(v) => println!("retrieved utf8 value: {}", v), - None => println!("did not read valid utf-8 out of the db"), + Ok(Some(value)) => match std::str::from_utf8(&value) { + Ok(v) => println!("retrieved utf8 value: {}", v), + Err(_) => println!("did not read valid utf-8 out of the db"), }, Err(_) => println!("error reading value"), _ => panic!("value not present!"), } let _ = db.get_cf(cf1, b"k1"); - // TODO assert!(r.unwrap().to_utf8().unwrap() == "abcdefgh"); + // TODO assert!(r.unwrap().as_ref() == b"abcdefgh"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").unwrap().is_none()); } diff --git a/tests/test_db.rs b/tests/test_db.rs index 0e6ae38..38a1d72 100644 --- a/tests/test_db.rs +++ b/tests/test_db.rs @@ -17,23 +17,11 @@ extern crate rocksdb; mod util; -use libc::size_t; - -use rocksdb::{DBVector, Error, IteratorMode, Options, Snapshot, WriteBatch, DB}; +use rocksdb::{Error, IteratorMode, Options, Snapshot, WriteBatch, DB}; use std::sync::Arc; use std::{mem, thread}; use util::DBPath; -#[test] -fn test_db_vector() { - use std::mem; - let len: size_t = 4; - let data: *mut u8 = unsafe { libc::calloc(len, mem::size_of::()) as *mut u8 }; - let v = unsafe { DBVector::from_c(data, len) }; - let ctrl = [0u8, 0, 0, 0]; - assert_eq!(&*v, &ctrl[..]); -} - #[test] fn external() { let path = DBPath::new("_rust_rocksdb_externaltest"); @@ -43,9 +31,9 @@ fn external() { assert!(db.put(b"k1", b"v1111").is_ok()); - let r: Result, Error> = db.get(b"k1"); + let r: Result>, Error> = db.get(b"k1"); - assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111"); + assert_eq!(r.unwrap().unwrap(), b"v1111"); assert!(db.delete(b"k1").is_ok()); assert!(db.get(b"k1").unwrap().is_none()); } @@ -60,7 +48,7 @@ fn db_vector_as_ref_byte_slice() { assert!(db.put(b"k1", b"v1111").is_ok()); - let r: Result, Error> = db.get(b"k1"); + let r: Result>, Error> = db.get(b"k1"); let vector = r.unwrap().unwrap(); assert!(get_byte_slice(&vector) == b"v1111"); @@ -104,8 +92,8 @@ fn writebatch_works() { assert!(!batch.is_empty()); assert!(db.get(b"k1").unwrap().is_none()); assert!(db.write(batch).is_ok()); - let r: Result, Error> = db.get(b"k1"); - assert!(r.unwrap().unwrap().to_utf8().unwrap() == "v1111"); + let r: Result>, Error> = db.get(b"k1"); + assert_eq!(r.unwrap().unwrap(), b"v1111"); } { // test delete @@ -156,7 +144,7 @@ fn snapshot_test() { assert!(db.put(b"k1", b"v1111").is_ok()); let snap = db.snapshot(); - assert!(snap.get(b"k1").unwrap().unwrap().to_utf8().unwrap() == "v1111"); + assert_eq!(snap.get(b"k1").unwrap().unwrap(), b"v1111"); assert!(db.put(b"k2", b"v2222").is_ok()); @@ -177,11 +165,11 @@ impl SnapshotWrapper { } } - fn check(&self, key: K, value: &str) -> bool + fn check(&self, key: K, value: &[u8]) -> bool where K: AsRef<[u8]>, { - self.snapshot.get(key).unwrap().unwrap().to_utf8().unwrap() == value + self.snapshot.get(key).unwrap().unwrap() == value } } @@ -195,10 +183,10 @@ fn sync_snapshot_test() { let wrapper = SnapshotWrapper::new(&db); let wrapper_1 = wrapper.clone(); - let handler_1 = thread::spawn(move || wrapper_1.check("k1", "v1")); + let handler_1 = thread::spawn(move || wrapper_1.check("k1", b"v1")); let wrapper_2 = wrapper.clone(); - let handler_2 = thread::spawn(move || wrapper_2.check("k2", "v2")); + let handler_2 = thread::spawn(move || wrapper_2.check("k2", b"v2")); assert!(handler_1.join().unwrap()); assert!(handler_2.join().unwrap());