From 19961b33ac6f7d29e7f66868992fecc990ddc39f Mon Sep 17 00:00:00 2001 From: Dan Burkert Date: Sun, 23 Nov 2014 19:52:25 -0800 Subject: [PATCH] Fix Transaction destructor bugs. Add concurrency tests. --- lmdb-sys/src/lib.rs | 2 +- src/environment.rs | 16 +++++ src/lib.rs | 1 + src/transaction.rs | 167 ++++++++++++++++++++++++++++++++++++++------ 4 files changed, 165 insertions(+), 21 deletions(-) diff --git a/lmdb-sys/src/lib.rs b/lmdb-sys/src/lib.rs index c80403a..01cc6fa 100644 --- a/lmdb-sys/src/lib.rs +++ b/lmdb-sys/src/lib.rs @@ -157,7 +157,7 @@ extern { pub fn mdb_dbi_open(txn: *mut MDB_txn, name: *const c_char, flags: c_uint, dbi: *mut MDB_dbi) -> c_int; pub fn mdb_stat(txn: *mut MDB_txn, dbi: MDB_dbi, stat: *mut MDB_stat) -> c_int; pub fn mdb_dbi_flags(txn: *mut MDB_txn, dbi: MDB_dbi, flags: *mut c_uint) -> c_int; - pub fn mdb_dbi_close(txn: *mut MDB_txn, dbi: MDB_dbi); + pub fn mdb_dbi_close(txn: *mut MDB_env, dbi: MDB_dbi); pub fn mdb_drop(txn: *mut MDB_txn, dbi: MDB_dbi, del: c_int) -> c_int; pub fn mdb_set_compare(txn: *mut MDB_txn, dbi: MDB_dbi, cmp: *mut MDB_cmp_func) -> c_int; pub fn mdb_set_dupsort(txn: *mut MDB_txn, dbi: MDB_dbi, cmp: *mut MDB_cmp_func) -> c_int; diff --git a/src/environment.rs b/src/environment.rs index d681957..2e95347 100644 --- a/src/environment.rs +++ b/src/environment.rs @@ -2,6 +2,7 @@ use libc::{c_uint, size_t, mode_t}; use std::io::FilePermission; use std::ptr; +use Database; use error::{LmdbError, LmdbResult, lmdb_result}; use ffi; use ffi::MDB_env; @@ -52,6 +53,21 @@ impl Environment { lmdb_result(ffi::mdb_env_sync(self.env(), if force { 1 } else { 0 })) } } + + /// Close a database handle. Normally unnecessary. + /// + /// This call is not mutex protected. Handles should only be closed by a single thread, and only + /// if no other threads are going to reference the database handle or one of its cursors any + /// further. Do not close a handle if an existing transaction has modified its database. Doing + /// so can cause misbehavior from database corruption to errors like `MDB_BAD_VALSIZE` (since the + /// DB name is gone). + /// + /// Closing a database handle is not necessary, but lets `Transaction::open_database` reuse the + /// handle value. Usually it's better to set a bigger `EnvironmentBuilder::set_max_dbs`, unless + /// that value would be large. + pub unsafe fn close_db(&self, db: Database) { + ffi::mdb_dbi_close(self.env, db.dbi()) + } } impl Drop for Environment { diff --git a/src/lib.rs b/src/lib.rs index bc59db2..ec18e31 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ //! //! Provides the minimal amount of abstraction necessary to interact with LMDB safely in Rust. In //! general, the API is very similar to the LMDB [C-API](http://symas.com/mdb/doc/). + #![feature(phase, globs, macro_rules, unsafe_destructor, if_let)] #[phase(plugin, link)] extern crate log; diff --git a/src/transaction.rs b/src/transaction.rs index 5632d69..7c0620b 100644 --- a/src/transaction.rs +++ b/src/transaction.rs @@ -5,7 +5,7 @@ use std::kinds::marker; use environment::Environment; use error::{LmdbResult, lmdb_result}; use ffi; -use ffi::MDB_txn; +use ffi::{MDB_txn, MDB_dbi}; use flags::{DatabaseFlags, EnvironmentFlags, WriteFlags}; /// An LMDB transaction. @@ -66,7 +66,7 @@ impl <'a> Transaction<'a> { pub fn open_db(&self, name: Option<&str>, flags: DatabaseFlags) -> LmdbResult> { let c_name = name.map(|n| n.to_c_str()); let name_ptr = if let Some(ref c_name) = c_name { c_name.as_ptr() } else { ptr::null() }; - let mut dbi: ffi::MDB_dbi = 0; + let mut dbi: MDB_dbi = 0; unsafe { try!(lmdb_result(ffi::mdb_dbi_open(self.txn, name_ptr, flags.bits(), &mut dbi))); } @@ -83,33 +83,22 @@ impl <'a> Transaction<'a> { Ok(DatabaseFlags::from_bits_truncate(flags)) } - /// Close a database handle. Normally unnecessary. - /// - /// This call is not mutex protected. Handles should only be closed by a single thread, and only - /// if no other threads are going to reference the database handle or one of its cursors any - /// further. Do not close a handle if an existing transaction has modified its database. Doing - /// so can cause misbehavior from database corruption to errors like `MDB_BAD_VALSIZE` (since the - /// DB name is gone). - /// - /// Closing a database handle is not necessary, but lets `Transaction::open_database` reuse the - /// handle value. Usually it's better to set a bigger `EnvironmentBuilder::set_max_dbs`, unless - /// that value would be large. - pub unsafe fn close_db(&self, db: Database) { - ffi::mdb_dbi_close(self.txn, db.dbi) - } - /// Commits the transaction. /// /// Any pending operations will be saved. pub fn commit(self) -> LmdbResult<()> { - unsafe { lmdb_result(ffi::mdb_txn_commit(self.txn())) } + unsafe { + let result = lmdb_result(ffi::mdb_txn_commit(self.txn())); + mem::forget(self); + result + } } /// Aborts the transaction. /// /// Any pending operations will not be saved. pub fn abort(self) { - unsafe { ffi::mdb_txn_abort(self.txn()) } + // Abort is called in the destructor } /// Gets an item from a database. @@ -197,14 +186,28 @@ impl <'a> Transaction<'a> { /// A database handle denotes the name and parameters of a database. The database may not /// exist in the environment. pub struct Database<'a> { - dbi: ffi::MDB_dbi, + dbi: MDB_dbi, _marker: marker::ContravariantLifetime<'a>, } +impl <'a> Copy for Database<'a> { } + +impl <'a> Database<'a> { + + /// Returns the underlying LMDB database handle. + /// + /// The caller **must** ensure that the handle is not used after the lifetime of the + /// environment, or after the database handle has been closed. + pub fn dbi(&self) -> MDB_dbi { + self.dbi + } +} + #[cfg(test)] mod test { use std::io; + use std::sync::{Arc, Barrier, Future}; use environment::*; use flags::*; @@ -253,4 +256,128 @@ mod test { txn.del(&db, b"key1", None).unwrap(); assert!(txn.get(&db, b"key1").is_err()); } + + #[test] + fn test_close_database() { + let dir = io::TempDir::new("test").unwrap(); + let env = Arc::new(Environment::new() + .set_max_dbs(10) + .open(dir.path(), io::USER_RWX) + .unwrap()); + + let db1 = { + let txn = env.begin_txn(EnvironmentFlags::empty()).unwrap(); + let db = txn.open_db(Some("db"), MDB_CREATE).unwrap(); + txn.commit().unwrap(); + db + }; + + let db2 = { + let txn = env.begin_txn(MDB_RDONLY).unwrap(); + let db = txn.open_db(Some("db"), DatabaseFlags::empty()).unwrap(); + txn.commit().unwrap(); + db + }; + + // Check that database handles are reused properly + assert!(db1.dbi == db2.dbi); + + { + let txn = env.begin_txn(EnvironmentFlags::empty()).unwrap(); + txn.put(&db1, b"key1", b"val1", WriteFlags::empty()).unwrap(); + assert!(txn.commit().is_ok()); + } + + unsafe { env.close_db(db1) }; + + { + let txn = env.begin_txn(EnvironmentFlags::empty()).unwrap(); + assert!(txn.put(&db1, b"key2", b"val2", WriteFlags::empty()).is_err()); + } + } + + #[test] + fn test_concurrent_readers_single_writer() { + let dir = io::TempDir::new("test").unwrap(); + let env = Arc::new(Environment::new().open(dir.path(), io::USER_RWX).unwrap()); + + let open_db_txn = env.begin_txn(MDB_RDONLY).unwrap(); + let db = Arc::new(open_db_txn.open_db(None, DatabaseFlags::empty()).unwrap()); + open_db_txn.commit().unwrap(); + + let n = 10u; // Number of concurrent readers + let barrier = Arc::new(Barrier::new(n + 1)); + let mut futures = Vec::with_capacity(n); + + let key = b"key"; + let val = b"val"; + + for _ in range(0, n) { + let reader_env = env.clone(); + let reader_db = db.clone(); + let reader_barrier = barrier.clone(); + + futures.push(Future::spawn(proc() { + { + let txn = reader_env.begin_txn(MDB_RDONLY).unwrap(); + assert!(txn.get(&*reader_db, key).is_err()); + txn.abort(); + } + reader_barrier.wait(); + reader_barrier.wait(); + { + let txn = reader_env.begin_txn(MDB_RDONLY).unwrap(); + txn.get(&*reader_db, key).unwrap() == val + } + })); + } + + let txn = env.begin_txn(EnvironmentFlags::empty()).unwrap(); + barrier.wait(); + txn.put(&*db, key, val, WriteFlags::empty()).unwrap(); + txn.commit().unwrap(); + barrier.wait(); + + assert!(futures.iter_mut().all(|b| b.get())) + } + + #[test] + fn test_concurrent_writers() { + let dir = io::TempDir::new("test").unwrap(); + let env = Arc::new(Environment::new().open(dir.path(), io::USER_RWX).unwrap()); + + let open_db_txn = env.begin_txn(MDB_RDONLY).unwrap(); + let db = Arc::new(open_db_txn.open_db(None, DatabaseFlags::empty()).unwrap()); + open_db_txn.commit().unwrap(); + + let n = 10u; // Number of concurrent writers + let mut futures = Vec::with_capacity(n); + + let key = "key"; + let val = "val"; + + for i in range(0, n) { + let writer_env = env.clone(); + let writer_db = db.clone(); + + futures.push(Future::spawn(proc() { + let txn = writer_env.begin_txn(EnvironmentFlags::empty()).unwrap(); + txn.put(&*writer_db, + format!("{}{}", key, i).as_bytes(), + format!("{}{}", val, i).as_bytes(), + WriteFlags::empty()) + .unwrap(); + txn.commit().is_ok() + })); + } + assert!(futures.iter_mut().all(|b| b.get())); + + let txn = env.begin_txn(MDB_RDONLY).unwrap(); + + for i in range(0, n) { + assert_eq!( + format!("{}{}", val, i).as_bytes(), + txn.get(&*db, format!("{}{}", key, i).as_bytes()).unwrap()); + } + } }