Fix Transaction destructor bugs. Add concurrency tests.

without.crypto
Dan Burkert 10 years ago
parent a40c89662a
commit 19961b33ac
  1. 2
      lmdb-sys/src/lib.rs
  2. 16
      src/environment.rs
  3. 1
      src/lib.rs
  4. 167
      src/transaction.rs

@ -157,7 +157,7 @@ extern {
pub fn mdb_dbi_open(txn: *mut MDB_txn, name: *const c_char, flags: c_uint, dbi: *mut MDB_dbi) -> c_int; pub fn mdb_dbi_open(txn: *mut MDB_txn, name: *const c_char, flags: c_uint, dbi: *mut MDB_dbi) -> c_int;
pub fn mdb_stat(txn: *mut MDB_txn, dbi: MDB_dbi, stat: *mut MDB_stat) -> c_int; pub fn mdb_stat(txn: *mut MDB_txn, dbi: MDB_dbi, stat: *mut MDB_stat) -> c_int;
pub fn mdb_dbi_flags(txn: *mut MDB_txn, dbi: MDB_dbi, flags: *mut c_uint) -> c_int; pub fn mdb_dbi_flags(txn: *mut MDB_txn, dbi: MDB_dbi, flags: *mut c_uint) -> c_int;
pub fn mdb_dbi_close(txn: *mut MDB_txn, dbi: MDB_dbi); pub fn mdb_dbi_close(txn: *mut MDB_env, dbi: MDB_dbi);
pub fn mdb_drop(txn: *mut MDB_txn, dbi: MDB_dbi, del: c_int) -> c_int; pub fn mdb_drop(txn: *mut MDB_txn, dbi: MDB_dbi, del: c_int) -> c_int;
pub fn mdb_set_compare(txn: *mut MDB_txn, dbi: MDB_dbi, cmp: *mut MDB_cmp_func) -> c_int; pub fn mdb_set_compare(txn: *mut MDB_txn, dbi: MDB_dbi, cmp: *mut MDB_cmp_func) -> c_int;
pub fn mdb_set_dupsort(txn: *mut MDB_txn, dbi: MDB_dbi, cmp: *mut MDB_cmp_func) -> c_int; pub fn mdb_set_dupsort(txn: *mut MDB_txn, dbi: MDB_dbi, cmp: *mut MDB_cmp_func) -> c_int;

@ -2,6 +2,7 @@ use libc::{c_uint, size_t, mode_t};
use std::io::FilePermission; use std::io::FilePermission;
use std::ptr; use std::ptr;
use Database;
use error::{LmdbError, LmdbResult, lmdb_result}; use error::{LmdbError, LmdbResult, lmdb_result};
use ffi; use ffi;
use ffi::MDB_env; use ffi::MDB_env;
@ -52,6 +53,21 @@ impl Environment {
lmdb_result(ffi::mdb_env_sync(self.env(), if force { 1 } else { 0 })) lmdb_result(ffi::mdb_env_sync(self.env(), if force { 1 } else { 0 }))
} }
} }
/// Close a database handle. Normally unnecessary.
///
/// This call is not mutex protected. Handles should only be closed by a single thread, and only
/// if no other threads are going to reference the database handle or one of its cursors any
/// further. Do not close a handle if an existing transaction has modified its database. Doing
/// so can cause misbehavior from database corruption to errors like `MDB_BAD_VALSIZE` (since the
/// DB name is gone).
///
/// Closing a database handle is not necessary, but lets `Transaction::open_database` reuse the
/// handle value. Usually it's better to set a bigger `EnvironmentBuilder::set_max_dbs`, unless
/// that value would be large.
pub unsafe fn close_db(&self, db: Database) {
ffi::mdb_dbi_close(self.env, db.dbi())
}
} }
impl Drop for Environment { impl Drop for Environment {

@ -3,6 +3,7 @@
//! //!
//! Provides the minimal amount of abstraction necessary to interact with LMDB safely in Rust. In //! Provides the minimal amount of abstraction necessary to interact with LMDB safely in Rust. In
//! general, the API is very similar to the LMDB [C-API](http://symas.com/mdb/doc/). //! general, the API is very similar to the LMDB [C-API](http://symas.com/mdb/doc/).
#![feature(phase, globs, macro_rules, unsafe_destructor, if_let)] #![feature(phase, globs, macro_rules, unsafe_destructor, if_let)]
#[phase(plugin, link)] extern crate log; #[phase(plugin, link)] extern crate log;

@ -5,7 +5,7 @@ use std::kinds::marker;
use environment::Environment; use environment::Environment;
use error::{LmdbResult, lmdb_result}; use error::{LmdbResult, lmdb_result};
use ffi; use ffi;
use ffi::MDB_txn; use ffi::{MDB_txn, MDB_dbi};
use flags::{DatabaseFlags, EnvironmentFlags, WriteFlags}; use flags::{DatabaseFlags, EnvironmentFlags, WriteFlags};
/// An LMDB transaction. /// An LMDB transaction.
@ -66,7 +66,7 @@ impl <'a> Transaction<'a> {
pub fn open_db(&self, name: Option<&str>, flags: DatabaseFlags) -> LmdbResult<Database<'a>> { pub fn open_db(&self, name: Option<&str>, flags: DatabaseFlags) -> LmdbResult<Database<'a>> {
let c_name = name.map(|n| n.to_c_str()); let c_name = name.map(|n| n.to_c_str());
let name_ptr = if let Some(ref c_name) = c_name { c_name.as_ptr() } else { ptr::null() }; let name_ptr = if let Some(ref c_name) = c_name { c_name.as_ptr() } else { ptr::null() };
let mut dbi: ffi::MDB_dbi = 0; let mut dbi: MDB_dbi = 0;
unsafe { unsafe {
try!(lmdb_result(ffi::mdb_dbi_open(self.txn, name_ptr, flags.bits(), &mut dbi))); try!(lmdb_result(ffi::mdb_dbi_open(self.txn, name_ptr, flags.bits(), &mut dbi)));
} }
@ -83,33 +83,22 @@ impl <'a> Transaction<'a> {
Ok(DatabaseFlags::from_bits_truncate(flags)) Ok(DatabaseFlags::from_bits_truncate(flags))
} }
/// Close a database handle. Normally unnecessary.
///
/// This call is not mutex protected. Handles should only be closed by a single thread, and only
/// if no other threads are going to reference the database handle or one of its cursors any
/// further. Do not close a handle if an existing transaction has modified its database. Doing
/// so can cause misbehavior from database corruption to errors like `MDB_BAD_VALSIZE` (since the
/// DB name is gone).
///
/// Closing a database handle is not necessary, but lets `Transaction::open_database` reuse the
/// handle value. Usually it's better to set a bigger `EnvironmentBuilder::set_max_dbs`, unless
/// that value would be large.
pub unsafe fn close_db(&self, db: Database) {
ffi::mdb_dbi_close(self.txn, db.dbi)
}
/// Commits the transaction. /// Commits the transaction.
/// ///
/// Any pending operations will be saved. /// Any pending operations will be saved.
pub fn commit(self) -> LmdbResult<()> { pub fn commit(self) -> LmdbResult<()> {
unsafe { lmdb_result(ffi::mdb_txn_commit(self.txn())) } unsafe {
let result = lmdb_result(ffi::mdb_txn_commit(self.txn()));
mem::forget(self);
result
}
} }
/// Aborts the transaction. /// Aborts the transaction.
/// ///
/// Any pending operations will not be saved. /// Any pending operations will not be saved.
pub fn abort(self) { pub fn abort(self) {
unsafe { ffi::mdb_txn_abort(self.txn()) } // Abort is called in the destructor
} }
/// Gets an item from a database. /// Gets an item from a database.
@ -197,14 +186,28 @@ impl <'a> Transaction<'a> {
/// A database handle denotes the name and parameters of a database. The database may not /// A database handle denotes the name and parameters of a database. The database may not
/// exist in the environment. /// exist in the environment.
pub struct Database<'a> { pub struct Database<'a> {
dbi: ffi::MDB_dbi, dbi: MDB_dbi,
_marker: marker::ContravariantLifetime<'a>, _marker: marker::ContravariantLifetime<'a>,
} }
impl <'a> Copy for Database<'a> { }
impl <'a> Database<'a> {
/// Returns the underlying LMDB database handle.
///
/// The caller **must** ensure that the handle is not used after the lifetime of the
/// environment, or after the database handle has been closed.
pub fn dbi(&self) -> MDB_dbi {
self.dbi
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::io; use std::io;
use std::sync::{Arc, Barrier, Future};
use environment::*; use environment::*;
use flags::*; use flags::*;
@ -253,4 +256,128 @@ mod test {
txn.del(&db, b"key1", None).unwrap(); txn.del(&db, b"key1", None).unwrap();
assert!(txn.get(&db, b"key1").is_err()); assert!(txn.get(&db, b"key1").is_err());
} }
#[test]
fn test_close_database() {
let dir = io::TempDir::new("test").unwrap();
let env = Arc::new(Environment::new()
.set_max_dbs(10)
.open(dir.path(), io::USER_RWX)
.unwrap());
let db1 = {
let txn = env.begin_txn(EnvironmentFlags::empty()).unwrap();
let db = txn.open_db(Some("db"), MDB_CREATE).unwrap();
txn.commit().unwrap();
db
};
let db2 = {
let txn = env.begin_txn(MDB_RDONLY).unwrap();
let db = txn.open_db(Some("db"), DatabaseFlags::empty()).unwrap();
txn.commit().unwrap();
db
};
// Check that database handles are reused properly
assert!(db1.dbi == db2.dbi);
{
let txn = env.begin_txn(EnvironmentFlags::empty()).unwrap();
txn.put(&db1, b"key1", b"val1", WriteFlags::empty()).unwrap();
assert!(txn.commit().is_ok());
}
unsafe { env.close_db(db1) };
{
let txn = env.begin_txn(EnvironmentFlags::empty()).unwrap();
assert!(txn.put(&db1, b"key2", b"val2", WriteFlags::empty()).is_err());
}
}
#[test]
fn test_concurrent_readers_single_writer() {
let dir = io::TempDir::new("test").unwrap();
let env = Arc::new(Environment::new().open(dir.path(), io::USER_RWX).unwrap());
let open_db_txn = env.begin_txn(MDB_RDONLY).unwrap();
let db = Arc::new(open_db_txn.open_db(None, DatabaseFlags::empty()).unwrap());
open_db_txn.commit().unwrap();
let n = 10u; // Number of concurrent readers
let barrier = Arc::new(Barrier::new(n + 1));
let mut futures = Vec::with_capacity(n);
let key = b"key";
let val = b"val";
for _ in range(0, n) {
let reader_env = env.clone();
let reader_db = db.clone();
let reader_barrier = barrier.clone();
futures.push(Future::spawn(proc() {
{
let txn = reader_env.begin_txn(MDB_RDONLY).unwrap();
assert!(txn.get(&*reader_db, key).is_err());
txn.abort();
}
reader_barrier.wait();
reader_barrier.wait();
{
let txn = reader_env.begin_txn(MDB_RDONLY).unwrap();
txn.get(&*reader_db, key).unwrap() == val
}
}));
}
let txn = env.begin_txn(EnvironmentFlags::empty()).unwrap();
barrier.wait();
txn.put(&*db, key, val, WriteFlags::empty()).unwrap();
txn.commit().unwrap();
barrier.wait();
assert!(futures.iter_mut().all(|b| b.get()))
}
#[test]
fn test_concurrent_writers() {
let dir = io::TempDir::new("test").unwrap();
let env = Arc::new(Environment::new().open(dir.path(), io::USER_RWX).unwrap());
let open_db_txn = env.begin_txn(MDB_RDONLY).unwrap();
let db = Arc::new(open_db_txn.open_db(None, DatabaseFlags::empty()).unwrap());
open_db_txn.commit().unwrap();
let n = 10u; // Number of concurrent writers
let mut futures = Vec::with_capacity(n);
let key = "key";
let val = "val";
for i in range(0, n) {
let writer_env = env.clone();
let writer_db = db.clone();
futures.push(Future::spawn(proc() {
let txn = writer_env.begin_txn(EnvironmentFlags::empty()).unwrap();
txn.put(&*writer_db,
format!("{}{}", key, i).as_bytes(),
format!("{}{}", val, i).as_bytes(),
WriteFlags::empty())
.unwrap();
txn.commit().is_ok()
}));
}
assert!(futures.iter_mut().all(|b| b.get()));
let txn = env.begin_txn(MDB_RDONLY).unwrap();
for i in range(0, n) {
assert_eq!(
format!("{}{}", val, i).as_bytes(),
txn.get(&*db, format!("{}{}", key, i).as_bytes()).unwrap());
}
}
} }

Loading…
Cancel
Save