wrap lmdb-rs transactions in rkv abstraction (#116)

without.crypto
Myk Melez 5 years ago committed by GitHub
parent 743c5eae2f
commit 9b46134e19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      examples/iterator.rs
  2. 7
      examples/simple-store.rs
  3. 27
      src/env.rs
  4. 21
      src/lib.rs
  5. 91
      src/readwrite.rs
  6. 21
      src/store.rs
  7. 23
      src/store/integer.rs
  8. 37
      src/store/integermulti.rs
  9. 35
      src/store/multi.rs
  10. 31
      src/store/single.rs
  11. 1
      tests/integer-store.rs
  12. 1
      tests/multi-integer-store.rs

@ -13,7 +13,6 @@ use rkv::{
SingleStore,
StoreError,
StoreOptions,
Transaction,
Value,
};
use tempfile::Builder;
@ -72,5 +71,5 @@ fn populate_store(k: &Rkv, store: SingleStore) -> Result<(), StoreError> {
] {
store.put(&mut writer, country, &city)?;
}
writer.commit().map_err(|e| e.into())
writer.commit()
}

@ -11,16 +11,15 @@ use rkv::{
Manager,
MultiStore,
Rkv,
RwTransaction,
StoreOptions,
Transaction,
Value,
Writer,
};
use tempfile::Builder;
use std::fs;
fn getput<'env, 's>(store: MultiStore, writer: &'env mut RwTransaction, ids: &'s mut Vec<String>) {
fn getput<'env, 's>(store: MultiStore, writer: &'env mut Writer, ids: &'s mut Vec<String>) {
let keys = vec!["str1", "str2", "str3"];
// we convert the writer into a cursor so that we can safely read
for k in keys.iter() {
@ -39,7 +38,7 @@ fn getput<'env, 's>(store: MultiStore, writer: &'env mut RwTransaction, ids: &'s
}
}
fn delete(store: MultiStore, writer: &mut RwTransaction) {
fn delete(store: MultiStore, writer: &mut Writer) {
let keys = vec!["str1", "str2", "str3"];
let vals = vec!["string uno", "string quatro", "string siete"];
// we convert the writer into a cursor so that we can safely read

@ -22,13 +22,14 @@ use lmdb::{
DatabaseFlags,
Environment,
EnvironmentBuilder,
RoTransaction,
RwTransaction,
Stat,
};
use crate::error::StoreError;
use crate::readwrite::{
Reader,
Writer,
};
use crate::store::integer::{
IntegerStore,
PrimitiveInt,
@ -166,15 +167,15 @@ impl Rkv {
/// Create a read transaction. There can be multiple concurrent readers
/// for an environment, up to the maximum specified by LMDB (default 126),
/// and you can open readers while a write transaction is active.
pub fn read(&self) -> Result<RoTransaction, StoreError> {
self.env.begin_ro_txn().map_err(|e| e.into())
pub fn read(&self) -> Result<Reader, StoreError> {
Ok(Reader::new(self.env.begin_ro_txn().map_err(StoreError::from)?))
}
/// Create a write transaction. There can be only one write transaction
/// active at any given time, so trying to create a second one will block
/// until the first is committed or aborted.
pub fn write(&self) -> Result<RwTransaction, StoreError> {
self.env.begin_rw_txn().map_err(|e| e.into())
pub fn write(&self) -> Result<Writer, StoreError> {
Ok(Writer::new(self.env.begin_rw_txn().map_err(StoreError::from)?))
}
}
@ -477,9 +478,15 @@ mod tests {
}
writer.commit().unwrap();
let mut writer = k.write().unwrap();
multistore.delete(&mut writer, "str1", &Value::Str("str1 foo")).unwrap();
assert_eq!(multistore.get_first(&writer, "str1").unwrap(), Some(Value::Str("str1 bar")));
multistore.delete(&mut writer, "str2", &Value::Str("str2 bar")).unwrap();
multistore.delete(&mut writer, "str3", &Value::Str("str3 bar")).unwrap();
assert_eq!(multistore.get_first(&writer, "str2").unwrap(), Some(Value::Str("str2 foo")));
multistore.delete_all(&mut writer, "str3").unwrap();
assert_eq!(multistore.get_first(&writer, "str3").unwrap(), None);
writer.commit().unwrap();
}
@ -540,8 +547,8 @@ mod tests {
// as the Value::I64 borrows an immutable reference to the Writer.
// So we extract and copy its primitive value.
fn get_existing_foo(txn: &RwTransaction, store: SingleStore) -> Option<i64> {
match store.get(txn, "foo").expect("read") {
fn get_existing_foo(writer: &Writer, store: SingleStore) -> Option<i64> {
match store.get(writer, "foo").expect("read") {
Some(Value::I64(val)) => Some(val),
_ => None,
}

@ -40,7 +40,7 @@
//!
//! ## Basic Usage
//! ```
//! use rkv::{Manager, Rkv, SingleStore, Value, Transaction, StoreOptions};
//! use rkv::{Manager, Rkv, SingleStore, Value, StoreOptions};
//! use std::fs;
//! use tempfile::Builder;
//!
@ -175,22 +175,17 @@
#![allow(dead_code)]
use lmdb;
pub use lmdb::{
DatabaseFlags,
EnvironmentBuilder,
EnvironmentFlags,
Error as LmdbError,
RoTransaction,
RwTransaction,
Transaction,
WriteFlags,
};
mod env;
pub mod error;
mod manager;
mod readwrite;
pub mod store;
pub mod value;
@ -202,6 +197,10 @@ pub use lmdb::{
Stat,
};
pub use self::readwrite::{
Reader,
Writer,
};
pub use self::store::integer::{
IntegerStore,
PrimitiveInt,
@ -224,3 +223,11 @@ pub use self::value::{
OwnedValue,
Value,
};
fn read_transform(val: Result<&[u8], lmdb::Error>) -> Result<Option<Value>, StoreError> {
match val {
Ok(bytes) => Value::from_tagged_slice(bytes).map(Some).map_err(StoreError::DataError),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(StoreError::LmdbError(e)),
}
}

@ -0,0 +1,91 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use lmdb::{
Database,
RoCursor,
RoTransaction,
RwTransaction,
Transaction,
WriteFlags,
};
use crate::error::StoreError;
use crate::read_transform;
use crate::value::Value;
pub struct Reader<'env>(pub RoTransaction<'env>);
pub struct Writer<'env>(pub RwTransaction<'env>);
pub trait Readable {
fn get<K: AsRef<[u8]>>(&self, db: Database, k: &K) -> Result<Option<Value>, StoreError>;
fn open_ro_cursor(&self, db: Database) -> Result<RoCursor, StoreError>;
}
impl<'env> Readable for Reader<'env> {
fn get<K: AsRef<[u8]>>(&self, db: Database, k: &K) -> Result<Option<Value>, StoreError> {
let bytes = self.0.get(db, &k);
read_transform(bytes)
}
fn open_ro_cursor(&self, db: Database) -> Result<RoCursor, StoreError> {
self.0.open_ro_cursor(db).map_err(StoreError::LmdbError)
}
}
impl<'env> Reader<'env> {
pub(crate) fn new(txn: RoTransaction) -> Reader {
Reader(txn)
}
pub fn abort(self) {
self.0.abort();
}
}
impl<'env> Readable for Writer<'env> {
fn get<K: AsRef<[u8]>>(&self, db: Database, k: &K) -> Result<Option<Value>, StoreError> {
let bytes = self.0.get(db, &k);
read_transform(bytes)
}
fn open_ro_cursor(&self, db: Database) -> Result<RoCursor, StoreError> {
self.0.open_ro_cursor(db).map_err(StoreError::LmdbError)
}
}
impl<'env> Writer<'env> {
pub(crate) fn new(txn: RwTransaction) -> Writer {
Writer(txn)
}
pub fn commit(self) -> Result<(), StoreError> {
self.0.commit().map_err(StoreError::LmdbError)
}
pub fn abort(self) {
self.0.abort();
}
pub(crate) fn put<K: AsRef<[u8]>>(
&mut self,
db: Database,
k: &K,
v: &Value,
flags: WriteFlags,
) -> Result<(), StoreError> {
// TODO: don't allocate twice.
self.0.put(db, &k, &v.to_bytes()?, flags).map_err(StoreError::LmdbError)
}
pub(crate) fn delete<K: AsRef<[u8]>>(&mut self, db: Database, k: &K, v: Option<&[u8]>) -> Result<(), StoreError> {
self.0.del(db, &k, v).map_err(StoreError::LmdbError)
}
}

@ -3,11 +3,6 @@ pub mod integermulti;
pub mod multi;
pub mod single;
use crate::{
error::StoreError,
value::OwnedValue,
value::Value,
};
use lmdb::DatabaseFlags;
#[derive(Default, Debug, Copy, Clone)]
@ -24,19 +19,3 @@ impl Options {
}
}
}
fn read_transform(val: Result<&[u8], lmdb::Error>) -> Result<Option<Value>, StoreError> {
match val {
Ok(bytes) => Value::from_tagged_slice(bytes).map(Some).map_err(StoreError::DataError),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(StoreError::LmdbError(e)),
}
}
fn read_transform_owned(val: Result<&[u8], lmdb::Error>) -> Result<Option<OwnedValue>, StoreError> {
match val {
Ok(bytes) => Value::from_tagged_slice(bytes).map(|v| Some(OwnedValue::from(&v))).map_err(StoreError::DataError),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(StoreError::LmdbError(e)),
}
}

@ -14,17 +14,18 @@ use bincode::serialize;
use serde::Serialize;
use lmdb::{
Database,
RwTransaction,
Transaction,
};
use lmdb::Database;
use crate::error::{
DataError,
StoreError,
};
use crate::readwrite::{
Readable,
Writer,
};
use crate::value::Value;
use crate::store::single::SingleStore;
@ -93,16 +94,16 @@ where
}
}
pub fn get<'env, T: Transaction>(&self, txn: &'env T, k: K) -> Result<Option<Value<'env>>, StoreError> {
self.inner.get(txn, Key::new(&k)?)
pub fn get<'env, T: Readable>(&self, reader: &'env T, k: K) -> Result<Option<Value<'env>>, StoreError> {
self.inner.get(reader, Key::new(&k)?)
}
pub fn put(&self, txn: &mut RwTransaction, k: K, v: &Value) -> Result<(), StoreError> {
self.inner.put(txn, Key::new(&k)?, v)
pub fn put(&self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
self.inner.put(writer, Key::new(&k)?, v)
}
pub fn delete(&self, txn: &mut RwTransaction, k: K) -> Result<(), StoreError> {
self.inner.delete(txn, Key::new(&k)?)
pub fn delete(&self, writer: &mut Writer, k: K) -> Result<(), StoreError> {
self.inner.delete(writer, Key::new(&k)?)
}
}

@ -10,8 +10,6 @@
use lmdb::{
Database,
RwTransaction,
Transaction,
WriteFlags,
};
@ -19,6 +17,11 @@ use std::marker::PhantomData;
use crate::error::StoreError;
use crate::readwrite::{
Readable,
Writer,
};
use crate::value::Value;
use crate::store::multi::{
@ -50,34 +53,28 @@ where
}
}
pub fn get<'env, T: Transaction>(&self, txn: &'env T, k: K) -> Result<Iter<'env>, StoreError> {
self.inner.get(txn, Key::new(&k)?)
pub fn get<'env, T: Readable>(&self, reader: &'env T, k: K) -> Result<Iter<'env>, StoreError> {
self.inner.get(reader, Key::new(&k)?)
}
pub fn get_first<'env, T: Transaction>(&self, txn: &'env T, k: K) -> Result<Option<Value<'env>>, StoreError> {
self.inner.get_first(txn, Key::new(&k)?)
pub fn get_first<'env, T: Readable>(&self, reader: &'env T, k: K) -> Result<Option<Value<'env>>, StoreError> {
self.inner.get_first(reader, Key::new(&k)?)
}
pub fn put(&self, txn: &mut RwTransaction, k: K, v: &Value) -> Result<(), StoreError> {
self.inner.put(txn, Key::new(&k)?, v)
pub fn put(&self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
self.inner.put(writer, Key::new(&k)?, v)
}
pub fn put_with_flags(
&self,
txn: &mut RwTransaction,
k: K,
v: &Value,
flags: WriteFlags,
) -> Result<(), StoreError> {
self.inner.put_with_flags(txn, Key::new(&k)?, v, flags)
pub fn put_with_flags(&self, writer: &mut Writer, k: K, v: &Value, flags: WriteFlags) -> Result<(), StoreError> {
self.inner.put_with_flags(writer, Key::new(&k)?, v, flags)
}
pub fn delete_all(&self, txn: &mut RwTransaction, k: K) -> Result<(), StoreError> {
self.inner.delete_all(txn, Key::new(&k)?)
pub fn delete_all(&self, writer: &mut Writer, k: K) -> Result<(), StoreError> {
self.inner.delete_all(writer, Key::new(&k)?)
}
pub fn delete(&self, txn: &mut RwTransaction, k: K, v: &Value) -> Result<(), StoreError> {
self.inner.delete(txn, Key::new(&k)?, v)
pub fn delete(&self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
self.inner.delete(writer, Key::new(&k)?, v)
}
}

@ -10,7 +10,11 @@
use crate::{
error::StoreError,
store::read_transform,
read_transform,
readwrite::{
Readable,
Writer,
},
value::Value,
};
use lmdb::{
@ -19,8 +23,6 @@ use lmdb::{
Iter as LmdbIter,
// IterDup as LmdbIterDup,
RoCursor,
RwTransaction,
Transaction,
WriteFlags,
};
@ -42,8 +44,8 @@ impl MultiStore {
}
/// Provides a cursor to all of the values for the duplicate entries that match this key
pub fn get<T: Transaction, K: AsRef<[u8]>>(self, txn: &T, k: K) -> Result<Iter, StoreError> {
let mut cursor = txn.open_ro_cursor(self.db).map_err(StoreError::LmdbError)?;
pub fn get<T: Readable, K: AsRef<[u8]>>(self, reader: &T, k: K) -> Result<Iter, StoreError> {
let mut cursor = reader.open_ro_cursor(self.db)?;
let iter = cursor.iter_dup_of(k);
Ok(Iter {
iter,
@ -52,36 +54,33 @@ impl MultiStore {
}
/// Provides the first value that matches this key
pub fn get_first<T: Transaction, K: AsRef<[u8]>>(self, txn: &T, k: K) -> Result<Option<Value>, StoreError> {
let result = txn.get(self.db, &k);
read_transform(result)
pub fn get_first<T: Readable, K: AsRef<[u8]>>(self, reader: &T, k: K) -> Result<Option<Value>, StoreError> {
reader.get(self.db, &k)
}
/// Insert a value at the specified key.
/// This put will allow duplicate entries. If you wish to have duplicate entries
/// rejected, use the `put_with_flags` function and specify NO_DUP_DATA
pub fn put<K: AsRef<[u8]>>(self, txn: &mut RwTransaction, k: K, v: &Value) -> Result<(), StoreError> {
let bytes = v.to_bytes()?;
txn.put(self.db, &k, &bytes, WriteFlags::empty()).map_err(StoreError::LmdbError)
pub fn put<K: AsRef<[u8]>>(self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
writer.put(self.db, &k, v, WriteFlags::empty())
}
pub fn put_with_flags<K: AsRef<[u8]>>(
self,
txn: &mut RwTransaction,
writer: &mut Writer,
k: K,
v: &Value,
flags: WriteFlags,
) -> Result<(), StoreError> {
let bytes = v.to_bytes()?;
txn.put(self.db, &k, &bytes, flags).map_err(StoreError::LmdbError)
writer.put(self.db, &k, v, flags)
}
pub fn delete_all<K: AsRef<[u8]>>(self, txn: &mut RwTransaction, k: K) -> Result<(), StoreError> {
txn.del(self.db, &k, None).map_err(StoreError::LmdbError)
pub fn delete_all<K: AsRef<[u8]>>(self, writer: &mut Writer, k: K) -> Result<(), StoreError> {
writer.delete(self.db, &k, None)
}
pub fn delete<K: AsRef<[u8]>>(self, txn: &mut RwTransaction, k: K, v: &Value) -> Result<(), StoreError> {
txn.del(self.db, &k, Some(&v.to_bytes()?)).map_err(StoreError::LmdbError)
pub fn delete<K: AsRef<[u8]>>(self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
writer.delete(self.db, &k, Some(&v.to_bytes()?))
}
/* TODO - Figure out how to solve the need to have the cursor stick around when

@ -10,7 +10,11 @@
use crate::{
error::StoreError,
store::read_transform,
read_transform,
readwrite::{
Readable,
Writer,
},
value::Value,
};
use lmdb::{
@ -18,8 +22,6 @@ use lmdb::{
Database,
Iter as LmdbIter,
RoCursor,
RwTransaction,
Transaction,
WriteFlags,
};
@ -40,24 +42,21 @@ impl SingleStore {
}
}
pub fn get<T: Transaction, K: AsRef<[u8]>>(self, txn: &T, k: K) -> Result<Option<Value>, StoreError> {
let bytes = txn.get(self.db, &k);
read_transform(bytes)
pub fn get<T: Readable, K: AsRef<[u8]>>(self, reader: &T, k: K) -> Result<Option<Value>, StoreError> {
reader.get(self.db, &k)
}
// TODO: flags
pub fn put<K: AsRef<[u8]>>(self, txn: &mut RwTransaction, k: K, v: &Value) -> Result<(), StoreError> {
// TODO: don't allocate twice.
let bytes = v.to_bytes()?;
txn.put(self.db, &k, &bytes, WriteFlags::empty()).map_err(StoreError::LmdbError)
pub fn put<K: AsRef<[u8]>>(self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
writer.put(self.db, &k, v, WriteFlags::empty())
}
pub fn delete<K: AsRef<[u8]>>(self, txn: &mut RwTransaction, k: K) -> Result<(), StoreError> {
txn.del(self.db, &k, None).map_err(StoreError::LmdbError)
pub fn delete<K: AsRef<[u8]>>(self, writer: &mut Writer, k: K) -> Result<(), StoreError> {
writer.delete(self.db, &k, None)
}
pub fn iter_start<T: Transaction>(self, txn: &T) -> Result<Iter, StoreError> {
let mut cursor = txn.open_ro_cursor(self.db).map_err(StoreError::LmdbError)?;
pub fn iter_start<T: Readable>(self, reader: &T) -> Result<Iter, StoreError> {
let mut cursor = reader.open_ro_cursor(self.db)?;
// We call Cursor.iter() instead of Cursor.iter_start() because
// the latter panics at "called `Result::unwrap()` on an `Err` value:
@ -75,8 +74,8 @@ impl SingleStore {
})
}
pub fn iter_from<T: Transaction, K: AsRef<[u8]>>(self, txn: &T, k: K) -> Result<Iter, StoreError> {
let mut cursor = txn.open_ro_cursor(self.db).map_err(StoreError::LmdbError)?;
pub fn iter_from<T: Readable, K: AsRef<[u8]>>(self, reader: &T, k: K) -> Result<Iter, StoreError> {
let mut cursor = reader.open_ro_cursor(self.db)?;
let iter = cursor.iter_from(k);
Ok(Iter {
iter,

@ -12,7 +12,6 @@ use rkv::{
PrimitiveInt,
Rkv,
StoreOptions,
Transaction,
Value,
};
use serde_derive::Serialize;

@ -12,7 +12,6 @@ use rkv::{
PrimitiveInt,
Rkv,
StoreOptions,
Transaction,
Value,
};
use serde_derive::Serialize;

Loading…
Cancel
Save