|
|
|
@ -1,6 +1,6 @@ |
|
|
|
|
use libc::{c_uint, c_void, size_t}; |
|
|
|
|
use std::{mem, ptr, slice}; |
|
|
|
|
use std::marker::{PhantomData, PhantomFn} ; |
|
|
|
|
use std::marker::PhantomData ; |
|
|
|
|
|
|
|
|
|
use ffi; |
|
|
|
|
|
|
|
|
@ -13,17 +13,13 @@ use flags::{DatabaseFlags, EnvironmentFlags, WriteFlags}; |
|
|
|
|
/// An LMDB transaction.
|
|
|
|
|
///
|
|
|
|
|
/// All database operations require a transaction.
|
|
|
|
|
pub trait Transaction<'env> : PhantomFn<(), &'env Environment> { |
|
|
|
|
pub trait Transaction : Sized { |
|
|
|
|
|
|
|
|
|
/// Returns a raw pointer to the underlying LMDB transaction.
|
|
|
|
|
///
|
|
|
|
|
/// The caller **must** ensure that the pointer is not used after the lifetime of the
|
|
|
|
|
/// transaction.
|
|
|
|
|
fn txn(&self) -> *mut ffi::MDB_txn; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Transaction extension methods.
|
|
|
|
|
pub trait TransactionExt<'env> : Transaction<'env> + Sized { |
|
|
|
|
|
|
|
|
|
/// Commits the transaction.
|
|
|
|
|
///
|
|
|
|
@ -100,18 +96,12 @@ pub trait TransactionExt<'env> : Transaction<'env> + Sized { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl<'env, T> TransactionExt<'env> for T where T: Transaction<'env> {} |
|
|
|
|
|
|
|
|
|
/// An LMDB read-only transaction.
|
|
|
|
|
pub struct RoTransaction<'env> { |
|
|
|
|
txn: *mut ffi::MDB_txn, |
|
|
|
|
_marker: PhantomData<&'env ()>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl <'env> !Sync for RoTransaction<'env> {} |
|
|
|
|
impl <'env> !Send for RoTransaction<'env> {} |
|
|
|
|
|
|
|
|
|
#[unsafe_destructor] |
|
|
|
|
impl <'env> Drop for RoTransaction<'env> { |
|
|
|
|
fn drop(&mut self) { |
|
|
|
|
unsafe { ffi::mdb_txn_abort(self.txn) } |
|
|
|
@ -154,7 +144,7 @@ impl <'env> RoTransaction<'env> { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl <'env> Transaction<'env> for RoTransaction<'env> { |
|
|
|
|
impl <'env> Transaction for RoTransaction<'env> { |
|
|
|
|
fn txn(&self) -> *mut ffi::MDB_txn { |
|
|
|
|
self.txn |
|
|
|
|
} |
|
|
|
@ -166,7 +156,6 @@ pub struct InactiveTransaction<'env> { |
|
|
|
|
_marker: PhantomData<&'env ()>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[unsafe_destructor] |
|
|
|
|
impl <'env> Drop for InactiveTransaction<'env> { |
|
|
|
|
fn drop(&mut self) { |
|
|
|
|
unsafe { ffi::mdb_txn_abort(self.txn) } |
|
|
|
@ -195,10 +184,6 @@ pub struct RwTransaction<'env> { |
|
|
|
|
_marker: PhantomData<&'env ()>, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl <'env> !Sync for RwTransaction<'env> {} |
|
|
|
|
impl <'env> !Send for RwTransaction<'env> {} |
|
|
|
|
|
|
|
|
|
#[unsafe_destructor] |
|
|
|
|
impl <'env> Drop for RwTransaction<'env> { |
|
|
|
|
fn drop(&mut self) { |
|
|
|
|
unsafe { ffi::mdb_txn_abort(self.txn) } |
|
|
|
@ -346,7 +331,7 @@ impl <'env> RwTransaction<'env> { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl <'env> Transaction<'env> for RwTransaction<'env> { |
|
|
|
|
impl <'env> Transaction for RwTransaction<'env> { |
|
|
|
|
fn txn(&self) -> *mut ffi::MDB_txn { |
|
|
|
|
self.txn |
|
|
|
|
} |
|
|
|
@ -359,7 +344,8 @@ mod test { |
|
|
|
|
use rand::{Rng, XorShiftRng}; |
|
|
|
|
use std::io::Write; |
|
|
|
|
use std::ptr; |
|
|
|
|
use std::sync::{Arc, Barrier, Future}; |
|
|
|
|
use std::sync::{Arc, Barrier}; |
|
|
|
|
use std::thread::{self, JoinHandle}; |
|
|
|
|
use test::{Bencher, black_box}; |
|
|
|
|
|
|
|
|
|
use tempdir::TempDir; |
|
|
|
@ -503,7 +489,7 @@ mod test { |
|
|
|
|
|
|
|
|
|
let n = 10usize; // Number of concurrent readers
|
|
|
|
|
let barrier = Arc::new(Barrier::new(n + 1)); |
|
|
|
|
let mut futures: Vec<Future<bool>> = Vec::with_capacity(n); |
|
|
|
|
let mut futures: Vec<JoinHandle<bool>> = Vec::with_capacity(n); |
|
|
|
|
|
|
|
|
|
let key = b"key"; |
|
|
|
|
let val = b"val"; |
|
|
|
@ -512,7 +498,7 @@ mod test { |
|
|
|
|
let reader_env = env.clone(); |
|
|
|
|
let reader_barrier = barrier.clone(); |
|
|
|
|
|
|
|
|
|
futures.push(Future::spawn(move|| { |
|
|
|
|
futures.push(thread::spawn(move|| { |
|
|
|
|
let db = reader_env.open_db(None).unwrap(); |
|
|
|
|
{ |
|
|
|
|
let txn = reader_env.begin_ro_txn().unwrap(); |
|
|
|
@ -535,7 +521,7 @@ mod test { |
|
|
|
|
txn.commit().unwrap(); |
|
|
|
|
barrier.wait(); |
|
|
|
|
|
|
|
|
|
assert!(futures.iter_mut().all(|b| b.get())) |
|
|
|
|
assert!(futures.into_iter().all(|b| b.join().unwrap())) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
@ -544,7 +530,7 @@ mod test { |
|
|
|
|
let env = Arc::new(Environment::new().open(dir.path()).unwrap()); |
|
|
|
|
|
|
|
|
|
let n = 10usize; // Number of concurrent writers
|
|
|
|
|
let mut futures: Vec<Future<bool>> = Vec::with_capacity(n); |
|
|
|
|
let mut futures: Vec<JoinHandle<bool>> = Vec::with_capacity(n); |
|
|
|
|
|
|
|
|
|
let key = "key"; |
|
|
|
|
let val = "val"; |
|
|
|
@ -552,7 +538,7 @@ mod test { |
|
|
|
|
for i in 0..n { |
|
|
|
|
let writer_env = env.clone(); |
|
|
|
|
|
|
|
|
|
futures.push(Future::spawn(move|| { |
|
|
|
|
futures.push(thread::spawn(move|| { |
|
|
|
|
let db = writer_env.open_db(None).unwrap(); |
|
|
|
|
let mut txn = writer_env.begin_rw_txn().unwrap(); |
|
|
|
|
txn.put(db, |
|
|
|
@ -563,7 +549,7 @@ mod test { |
|
|
|
|
txn.commit().is_ok() |
|
|
|
|
})); |
|
|
|
|
} |
|
|
|
|
assert!(futures.iter_mut().all(|b| b.get())); |
|
|
|
|
assert!(futures.into_iter().all(|b| b.join().unwrap())); |
|
|
|
|
|
|
|
|
|
let db = env.open_db(None).unwrap(); |
|
|
|
|
let txn = env.begin_ro_txn().unwrap(); |
|
|
|
|