|
|
|
@ -17,8 +17,8 @@ pub trait Transaction : Sized { |
|
|
|
|
|
|
|
|
|
/// Returns a raw pointer to the underlying LMDB transaction.
|
|
|
|
|
///
|
|
|
|
|
/// The caller **must** ensure that the pointer is not used after the lifetime of the
|
|
|
|
|
/// transaction.
|
|
|
|
|
/// The caller **must** ensure that the pointer is not used after the
|
|
|
|
|
/// lifetime of the transaction.
|
|
|
|
|
fn txn(&self) -> *mut ffi::MDB_txn; |
|
|
|
|
|
|
|
|
|
/// Commits the transaction.
|
|
|
|
@ -41,31 +41,39 @@ pub trait Transaction : Sized { |
|
|
|
|
|
|
|
|
|
/// Opens a database in the transaction.
|
|
|
|
|
///
|
|
|
|
|
/// If `name` is `None`, then the default database will be opened, otherwise a named database
|
|
|
|
|
/// will be opened. The database handle will be private to the transaction until the transaction
|
|
|
|
|
/// is successfully committed. If the transaction is aborted the returned database handle
|
|
|
|
|
/// should no longer be used.
|
|
|
|
|
/// If `name` is `None`, then the default database will be opened, otherwise
|
|
|
|
|
/// a named database will be opened. The database handle will be private to
|
|
|
|
|
/// the transaction until the transaction is successfully committed. If the
|
|
|
|
|
/// transaction is aborted the returned database handle should no longer be
|
|
|
|
|
/// used.
|
|
|
|
|
///
|
|
|
|
|
/// Prefer using `Environment::open_db`.
|
|
|
|
|
///
|
|
|
|
|
/// ## Safety
|
|
|
|
|
///
|
|
|
|
|
/// This function (as well as `Environment::open_db`, `Environment::create_db`, and
|
|
|
|
|
/// `Database::create`) **must not** be called from multiple concurrent transactions in the same
|
|
|
|
|
/// environment. A transaction which uses this function must finish (either commit or abort)
|
|
|
|
|
/// before any other transaction may use this function.
|
|
|
|
|
/// This function (as well as `Environment::open_db`,
|
|
|
|
|
/// `Environment::create_db`, and `Database::create`) **must not** be called
|
|
|
|
|
/// from multiple concurrent transactions in the same environment. A
|
|
|
|
|
/// transaction which uses this function must finish (either commit or
|
|
|
|
|
/// abort) before any other transaction may use this function.
|
|
|
|
|
unsafe fn open_db(&self, name: Option<&str>) -> Result<Database> { |
|
|
|
|
Database::new(self.txn(), name, 0) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Gets an item from a database.
|
|
|
|
|
///
|
|
|
|
|
/// This function retrieves the data associated with the given key in the database. If the
|
|
|
|
|
/// database supports duplicate keys (`DatabaseFlags::DUP_SORT`) then the first data item for
|
|
|
|
|
/// the key will be returned. Retrieval of other items requires the use of
|
|
|
|
|
/// `Transaction::cursor_get`. If the item is not in the database, then `Error::NotFound`
|
|
|
|
|
/// will be returned.
|
|
|
|
|
fn get<'txn>(&'txn self, database: Database, key: &[u8]) -> Result<&'txn [u8]> { |
|
|
|
|
/// This function retrieves the data associated with the given key in the
|
|
|
|
|
/// database. If the database supports duplicate keys
|
|
|
|
|
/// (`DatabaseFlags::DUP_SORT`) then the first data item for the key will be
|
|
|
|
|
/// returned. Retrieval of other items requires the use of
|
|
|
|
|
/// `Transaction::cursor_get`. If the item is not in the database, then
|
|
|
|
|
/// `Error::NotFound` will be returned.
|
|
|
|
|
fn get<'txn, K>(&'txn self, |
|
|
|
|
database: Database, |
|
|
|
|
key: &K) |
|
|
|
|
-> Result<&'txn [u8]> |
|
|
|
|
where K: AsRef<[u8]> { |
|
|
|
|
let key = key.as_ref(); |
|
|
|
|
let mut key_val: ffi::MDB_val = ffi::MDB_val { mv_size: key.len() as size_t, |
|
|
|
|
mv_data: key.as_ptr() as *mut c_void }; |
|
|
|
|
let mut data_val: ffi::MDB_val = ffi::MDB_val { mv_size: 0, |
|
|
|
@ -110,8 +118,8 @@ impl <'env> Drop for RoTransaction<'env> { |
|
|
|
|
|
|
|
|
|
impl <'env> RoTransaction<'env> { |
|
|
|
|
|
|
|
|
|
/// Creates a new read-only transaction in the given environment. Prefer using
|
|
|
|
|
/// `Environment::begin_ro_txn`.
|
|
|
|
|
/// Creates a new read-only transaction in the given environment. Prefer
|
|
|
|
|
/// using `Environment::begin_ro_txn`.
|
|
|
|
|
#[doc(hidden)] |
|
|
|
|
pub fn new(env: &'env Environment) -> Result<RoTransaction<'env>> { |
|
|
|
|
let mut txn: *mut ffi::MDB_txn = ptr::null_mut(); |
|
|
|
@ -126,14 +134,16 @@ impl <'env> RoTransaction<'env> { |
|
|
|
|
|
|
|
|
|
/// Resets the read-only transaction.
|
|
|
|
|
///
|
|
|
|
|
/// Abort the transaction like `Transaction::abort`, but keep the transaction handle.
|
|
|
|
|
/// `InactiveTransaction::renew` may reuse the handle. This saves allocation overhead if the
|
|
|
|
|
/// process will start a new read-only transaction soon, and also locking overhead if
|
|
|
|
|
/// `EnvironmentFlags::NO_TLS` is in use. The reader table lock is released, but the table slot
|
|
|
|
|
/// stays tied to its thread or transaction. Reader locks generally don't interfere with
|
|
|
|
|
/// writers, but they keep old versions of database pages allocated. Thus they prevent the old
|
|
|
|
|
/// pages from being reused when writers commit new data, and so under heavy load the database
|
|
|
|
|
/// size may grow much more rapidly than otherwise.
|
|
|
|
|
/// Abort the transaction like `Transaction::abort`, but keep the
|
|
|
|
|
/// transaction handle. `InactiveTransaction::renew` may reuse the handle.
|
|
|
|
|
/// This saves allocation overhead if the process will start a new read-only
|
|
|
|
|
/// transaction soon, and also locking overhead if
|
|
|
|
|
/// `EnvironmentFlags::NO_TLS` is in use. The reader table lock is released,
|
|
|
|
|
/// but the table slot stays tied to its thread or transaction. Reader locks
|
|
|
|
|
/// generally don't interfere with writers, but they keep old versions of
|
|
|
|
|
/// database pages allocated. Thus they prevent the old pages from being
|
|
|
|
|
/// reused when writers commit new data, and so under heavy load the
|
|
|
|
|
/// database size may grow much more rapidly than otherwise.
|
|
|
|
|
pub fn reset(self) -> InactiveTransaction<'env> { |
|
|
|
|
let txn = self.txn; |
|
|
|
|
unsafe { |
|
|
|
@ -164,10 +174,11 @@ impl <'env> Drop for InactiveTransaction<'env> { |
|
|
|
|
|
|
|
|
|
impl <'env> InactiveTransaction<'env> { |
|
|
|
|
|
|
|
|
|
/// Renews the inactive transaction, returning an active read-only transaction.
|
|
|
|
|
/// Renews the inactive transaction, returning an active read-only
|
|
|
|
|
/// transaction.
|
|
|
|
|
///
|
|
|
|
|
/// This acquires a new reader lock for a transaction handle that had been released by
|
|
|
|
|
/// `RoTransaction::reset`.
|
|
|
|
|
/// This acquires a new reader lock for a transaction handle that had been
|
|
|
|
|
/// released by `RoTransaction::reset`.
|
|
|
|
|
pub fn renew(self) -> Result<RoTransaction<'env>> { |
|
|
|
|
let txn = self.txn; |
|
|
|
|
unsafe { |
|
|
|
@ -192,8 +203,8 @@ impl <'env> Drop for RwTransaction<'env> { |
|
|
|
|
|
|
|
|
|
impl <'env> RwTransaction<'env> { |
|
|
|
|
|
|
|
|
|
/// Creates a new read-write transaction in the given environment. Prefer using
|
|
|
|
|
/// `Environment::begin_ro_txn`.
|
|
|
|
|
/// Creates a new read-write transaction in the given environment. Prefer
|
|
|
|
|
/// using `Environment::begin_ro_txn`.
|
|
|
|
|
#[doc(hidden)] |
|
|
|
|
pub fn new(env: &'env Environment) -> Result<RwTransaction<'env>> { |
|
|
|
|
let mut txn: *mut ffi::MDB_txn = ptr::null_mut(); |
|
|
|
@ -208,24 +219,25 @@ impl <'env> RwTransaction<'env> { |
|
|
|
|
|
|
|
|
|
/// Opens a database in the provided transaction, creating it if necessary.
|
|
|
|
|
///
|
|
|
|
|
/// If `name` is `None`, then the default database will be opened, otherwise a named database
|
|
|
|
|
/// will be opened. The database handle will be private to the transaction until the transaction
|
|
|
|
|
/// is successfully committed. If the transaction is aborted the returned database handle
|
|
|
|
|
/// should no longer be used.
|
|
|
|
|
/// If `name` is `None`, then the default database will be opened, otherwise
|
|
|
|
|
/// a named database will be opened. The database handle will be private to
|
|
|
|
|
/// the transaction until the transaction is successfully committed. If the
|
|
|
|
|
/// transaction is aborted the returned database handle should no longer be
|
|
|
|
|
/// used.
|
|
|
|
|
///
|
|
|
|
|
/// Prefer using `Environment::create_db`.
|
|
|
|
|
///
|
|
|
|
|
/// ## Safety
|
|
|
|
|
///
|
|
|
|
|
/// * This function (as well as `Environment::open_db`, `Environment::create_db`, and
|
|
|
|
|
/// `Database::open`) **must not** be called from multiple concurrent transactions in the same
|
|
|
|
|
/// environment. A transaction which uses this function must finish (either commit or abort)
|
|
|
|
|
/// before any other transaction may use this function.
|
|
|
|
|
/// * This function (as well as `Environment::open_db`,
|
|
|
|
|
/// `Environment::create_db`, and `Database::open`) **must not** be called
|
|
|
|
|
/// from multiple concurrent transactions in the same environment. A
|
|
|
|
|
/// transaction which uses this function must finish (either commit or
|
|
|
|
|
/// abort) before any other transaction may use this function.
|
|
|
|
|
pub unsafe fn create_db(&self, name: Option<&str>, flags: DatabaseFlags) -> Result<Database> { |
|
|
|
|
Database::new(self.txn(), name, flags.bits() | ffi::MDB_CREATE) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Opens a new read-write cursor on the given database and transaction.
|
|
|
|
|
pub fn open_rw_cursor<'txn>(&'txn mut self, db: Database) -> Result<RwCursor<'txn>> { |
|
|
|
|
RwCursor::new(self, db) |
|
|
|
@ -233,15 +245,19 @@ impl <'env> RwTransaction<'env> { |
|
|
|
|
|
|
|
|
|
/// Stores an item into a database.
|
|
|
|
|
///
|
|
|
|
|
/// This function stores key/data pairs in the database. The default behavior is to enter the
|
|
|
|
|
/// new key/data pair, replacing any previously existing key if duplicates are disallowed, or
|
|
|
|
|
/// adding a duplicate data item if duplicates are allowed (`DatabaseFlags::DUP_SORT`).
|
|
|
|
|
pub fn put(&mut self, |
|
|
|
|
/// This function stores key/data pairs in the database. The default
|
|
|
|
|
/// behavior is to enter the new key/data pair, replacing any previously
|
|
|
|
|
/// existing key if duplicates are disallowed, or adding a duplicate data
|
|
|
|
|
/// item if duplicates are allowed (`DatabaseFlags::DUP_SORT`).
|
|
|
|
|
pub fn put<K, D>(&mut self, |
|
|
|
|
database: Database, |
|
|
|
|
key: &[u8], |
|
|
|
|
data: &[u8], |
|
|
|
|
key: &K, |
|
|
|
|
data: &D, |
|
|
|
|
flags: WriteFlags) |
|
|
|
|
-> Result<()> { |
|
|
|
|
-> Result<()> |
|
|
|
|
where K: AsRef<[u8]>, D: AsRef<[u8]> { |
|
|
|
|
let key = key.as_ref(); |
|
|
|
|
let data = data.as_ref(); |
|
|
|
|
let mut key_val: ffi::MDB_val = ffi::MDB_val { mv_size: key.len() as size_t, |
|
|
|
|
mv_data: key.as_ptr() as *mut c_void }; |
|
|
|
|
let mut data_val: ffi::MDB_val = ffi::MDB_val { mv_size: data.len() as size_t, |
|
|
|
@ -255,14 +271,17 @@ impl <'env> RwTransaction<'env> { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// Returns a `BufWriter` which can be used to write a value into the item at the given key
|
|
|
|
|
/// and with the given length. The buffer must be completely filled by the caller.
|
|
|
|
|
pub fn reserve<'txn>(&'txn mut self, |
|
|
|
|
/// Returns a buffer which can be used to write a value into the item at the
|
|
|
|
|
/// given key and with the given length. The buffer must be completely
|
|
|
|
|
/// filled by the caller.
|
|
|
|
|
pub fn reserve<'txn, K>(&'txn mut self, |
|
|
|
|
database: Database, |
|
|
|
|
key: &[u8], |
|
|
|
|
key: &K, |
|
|
|
|
len: size_t, |
|
|
|
|
flags: WriteFlags) |
|
|
|
|
-> Result<&'txn mut [u8]> { |
|
|
|
|
-> Result<&'txn mut [u8]> |
|
|
|
|
where K: AsRef<[u8]> { |
|
|
|
|
let key = key.as_ref(); |
|
|
|
|
let mut key_val: ffi::MDB_val = ffi::MDB_val { mv_size: key.len() as size_t, |
|
|
|
|
mv_data: key.as_ptr() as *mut c_void }; |
|
|
|
|
let mut data_val: ffi::MDB_val = ffi::MDB_val { mv_size: len, |
|
|
|
@ -280,17 +299,21 @@ impl <'env> RwTransaction<'env> { |
|
|
|
|
|
|
|
|
|
/// Deletes an item from a database.
|
|
|
|
|
///
|
|
|
|
|
/// This function removes key/data pairs from the database. If the database does not support
|
|
|
|
|
/// sorted duplicate data items (`DatabaseFlags::DUP_SORT`) the data parameter is ignored.
|
|
|
|
|
/// If the database supports sorted duplicates and the data parameter is `None`, all of the
|
|
|
|
|
/// duplicate data items for the key will be deleted. Otherwise, if the data parameter is
|
|
|
|
|
/// `Some` only the matching data item will be deleted. This function will return
|
|
|
|
|
/// `Error::NotFound` if the specified key/data pair is not in the database.
|
|
|
|
|
pub fn del(&mut self, |
|
|
|
|
/// This function removes key/data pairs from the database. If the database
|
|
|
|
|
/// does not support sorted duplicate data items (`DatabaseFlags::DUP_SORT`)
|
|
|
|
|
/// the data parameter is ignored. If the database supports sorted
|
|
|
|
|
/// duplicates and the data parameter is `None`, all of the duplicate data
|
|
|
|
|
/// items for the key will be deleted. Otherwise, if the data parameter is
|
|
|
|
|
/// `Some` only the matching data item will be deleted. This function will
|
|
|
|
|
/// return `Error::NotFound` if the specified key/data pair is not in the
|
|
|
|
|
/// database.
|
|
|
|
|
pub fn del<K>(&mut self, |
|
|
|
|
database: Database, |
|
|
|
|
key: &[u8], |
|
|
|
|
key: &K, |
|
|
|
|
data: Option<&[u8]>) |
|
|
|
|
-> Result<()> { |
|
|
|
|
-> Result<()> |
|
|
|
|
where K: AsRef<[u8]> { |
|
|
|
|
let key = key.as_ref(); |
|
|
|
|
let mut key_val: ffi::MDB_val = ffi::MDB_val { mv_size: key.len() as size_t, |
|
|
|
|
mv_data: key.as_ptr() as *mut c_void }; |
|
|
|
|
let data_val: Option<ffi::MDB_val> = |
|
|
|
@ -314,8 +337,8 @@ impl <'env> RwTransaction<'env> { |
|
|
|
|
///
|
|
|
|
|
/// ## Safety
|
|
|
|
|
///
|
|
|
|
|
/// This method is unsafe in the same ways as `Environment::close_db`, and should be used
|
|
|
|
|
/// accordingly.
|
|
|
|
|
/// This method is unsafe in the same ways as `Environment::close_db`, and
|
|
|
|
|
/// should be used accordingly.
|
|
|
|
|
pub unsafe fn drop_db(&mut self, db: Database) -> Result<()> { |
|
|
|
|
lmdb_result(ffi::mdb_drop(self.txn, db.dbi(), 1)) |
|
|
|
|
} |
|
|
|
@ -489,7 +512,7 @@ mod test { |
|
|
|
|
|
|
|
|
|
let n = 10usize; // Number of concurrent readers
|
|
|
|
|
let barrier = Arc::new(Barrier::new(n + 1)); |
|
|
|
|
let mut futures: Vec<JoinHandle<bool>> = Vec::with_capacity(n); |
|
|
|
|
let mut threads: Vec<JoinHandle<bool>> = Vec::with_capacity(n); |
|
|
|
|
|
|
|
|
|
let key = b"key"; |
|
|
|
|
let val = b"val"; |
|
|
|
@ -498,7 +521,7 @@ mod test { |
|
|
|
|
let reader_env = env.clone(); |
|
|
|
|
let reader_barrier = barrier.clone(); |
|
|
|
|
|
|
|
|
|
futures.push(thread::spawn(move|| { |
|
|
|
|
threads.push(thread::spawn(move|| { |
|
|
|
|
let db = reader_env.open_db(None).unwrap(); |
|
|
|
|
{ |
|
|
|
|
let txn = reader_env.begin_ro_txn().unwrap(); |
|
|
|
@ -521,7 +544,7 @@ mod test { |
|
|
|
|
txn.commit().unwrap(); |
|
|
|
|
barrier.wait(); |
|
|
|
|
|
|
|
|
|
assert!(futures.into_iter().all(|b| b.join().unwrap())) |
|
|
|
|
assert!(threads.into_iter().all(|b| b.join().unwrap())) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[test] |
|
|
|
@ -530,7 +553,7 @@ mod test { |
|
|
|
|
let env = Arc::new(Environment::new().open(dir.path()).unwrap()); |
|
|
|
|
|
|
|
|
|
let n = 10usize; // Number of concurrent writers
|
|
|
|
|
let mut futures: Vec<JoinHandle<bool>> = Vec::with_capacity(n); |
|
|
|
|
let mut threads: Vec<JoinHandle<bool>> = Vec::with_capacity(n); |
|
|
|
|
|
|
|
|
|
let key = "key"; |
|
|
|
|
let val = "val"; |
|
|
|
@ -538,26 +561,25 @@ mod test { |
|
|
|
|
for i in 0..n { |
|
|
|
|
let writer_env = env.clone(); |
|
|
|
|
|
|
|
|
|
futures.push(thread::spawn(move|| { |
|
|
|
|
threads.push(thread::spawn(move|| { |
|
|
|
|
let db = writer_env.open_db(None).unwrap(); |
|
|
|
|
let mut txn = writer_env.begin_rw_txn().unwrap(); |
|
|
|
|
txn.put(db, |
|
|
|
|
format!("{}{}", key, i).as_bytes(), |
|
|
|
|
format!("{}{}", val, i).as_bytes(), |
|
|
|
|
&format!("{}{}", key, i), |
|
|
|
|
&format!("{}{}", val, i), |
|
|
|
|
WriteFlags::empty()) |
|
|
|
|
.unwrap(); |
|
|
|
|
txn.commit().is_ok() |
|
|
|
|
})); |
|
|
|
|
} |
|
|
|
|
assert!(futures.into_iter().all(|b| b.join().unwrap())); |
|
|
|
|
assert!(threads.into_iter().all(|b| b.join().unwrap())); |
|
|
|
|
|
|
|
|
|
let db = env.open_db(None).unwrap(); |
|
|
|
|
let txn = env.begin_ro_txn().unwrap(); |
|
|
|
|
|
|
|
|
|
for i in 0..n { |
|
|
|
|
assert_eq!( |
|
|
|
|
format!("{}{}", val, i).as_bytes(), |
|
|
|
|
txn.get(db, format!("{}{}", key, i).as_bytes()).unwrap()); |
|
|
|
|
assert_eq!(format!("{}{}", val, i).as_bytes(), |
|
|
|
|
txn.get(db, &format!("{}{}", key, i)).unwrap()); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -573,8 +595,8 @@ mod test { |
|
|
|
|
|
|
|
|
|
b.iter(|| { |
|
|
|
|
let mut i = 0usize; |
|
|
|
|
for key in keys.iter() { |
|
|
|
|
i = i + txn.get(db, key.as_bytes()).unwrap().len(); |
|
|
|
|
for key in &keys { |
|
|
|
|
i = i + txn.get(db, key).unwrap().len(); |
|
|
|
|
} |
|
|
|
|
black_box(i); |
|
|
|
|
}); |
|
|
|
@ -598,7 +620,7 @@ mod test { |
|
|
|
|
|
|
|
|
|
b.iter(|| unsafe { |
|
|
|
|
let mut i: size_t = 0; |
|
|
|
|
for key in keys.iter() { |
|
|
|
|
for key in &keys { |
|
|
|
|
key_val.mv_size = key.len() as size_t; |
|
|
|
|
key_val.mv_data = key.as_bytes().as_ptr() as *mut _; |
|
|
|
|
|
|
|
|
@ -622,7 +644,7 @@ mod test { |
|
|
|
|
b.iter(|| { |
|
|
|
|
let mut txn = env.begin_rw_txn().unwrap(); |
|
|
|
|
for &(ref key, ref data) in items.iter() { |
|
|
|
|
txn.put(db, key.as_bytes(), data.as_bytes(), WriteFlags::empty()).unwrap(); |
|
|
|
|
txn.put(db, key, data, WriteFlags::empty()).unwrap(); |
|
|
|
|
} |
|
|
|
|
txn.abort(); |
|
|
|
|
}); |
|
|
|
|