gh46 part 1: move reader/writer from store to rkv

without.crypto
Nan Jiang 6 years ago
parent 3b74a36038
commit e068f86595
  1. 112
      examples/iterator.rs
  2. 198
      examples/simple-store.rs
  3. 465
      src/env.rs
  4. 342
      src/integer.rs
  5. 95
      src/lib.rs
  6. 85
      src/readwrite.rs

@ -7,71 +7,71 @@
//!
//! cargo run --example iterator
extern crate rkv;
extern crate tempfile;
// extern crate rkv;
// extern crate tempfile;
use rkv::{
Manager,
Rkv,
Store,
StoreError,
Value,
};
use tempfile::Builder;
// use rkv::{
// Manager,
// Rkv,
// Store,
// StoreError,
// Value,
// };
// use tempfile::Builder;
use std::fs;
use std::str;
// use std::fs;
// use std::str;
fn main() {
let root = Builder::new().prefix("iterator").tempdir().unwrap();
fs::create_dir_all(root.path()).unwrap();
let p = root.path();
// let root = Builder::new().prefix("iterator").tempdir().unwrap();
// fs::create_dir_all(root.path()).unwrap();
// let p = root.path();
let created_arc = Manager::singleton().write().unwrap().get_or_create(p, Rkv::new).unwrap();
let k = created_arc.read().unwrap();
let store: Store<&str> = k.open_or_create("store").unwrap();
// let created_arc = Manager::singleton().write().unwrap().get_or_create(p, Rkv::new).unwrap();
// let k = created_arc.read().unwrap();
// let store: Store<&str> = k.open_or_create("store").unwrap();
populate_store(&k, &store).unwrap();
// populate_store(&k, &store).unwrap();
let reader = store.read(&k).unwrap();
// let reader = store.read(&k).unwrap();
println!("Iterating from the beginning...");
// Reader::iter_start() iterates from the first item in the store, and
// returns the (key, value) tuples in order.
let mut iter = reader.iter_start().unwrap();
while let Some((country, city)) = iter.next() {
println!("{}, {:?}", str::from_utf8(country).unwrap(), city);
}
// println!("Iterating from the beginning...");
// // Reader::iter_start() iterates from the first item in the store, and
// // returns the (key, value) tuples in order.
// let mut iter = reader.iter_start().unwrap();
// while let Some((country, city)) = iter.next() {
// println!("{}, {:?}", str::from_utf8(country).unwrap(), city);
// }
println!("");
println!("Iterating from the given key...");
// Reader::iter_from() iterates from the first key equal to or greater
// than the given key.
let mut iter = reader.iter_from("Japan").unwrap();
while let Some((country, city)) = iter.next() {
println!("{}, {:?}", str::from_utf8(country).unwrap(), city);
}
// println!("");
// println!("Iterating from the given key...");
// // Reader::iter_from() iterates from the first key equal to or greater
// // than the given key.
// let mut iter = reader.iter_from("Japan").unwrap();
// while let Some((country, city)) = iter.next() {
// println!("{}, {:?}", str::from_utf8(country).unwrap(), city);
// }
println!("");
println!("Iterating from the given prefix...");
let mut iter = reader.iter_from("Un").unwrap();
while let Some((country, city)) = iter.next() {
println!("{}, {:?}", str::from_utf8(country).unwrap(), city);
}
}
// println!("");
// println!("Iterating from the given prefix...");
// let mut iter = reader.iter_from("Un").unwrap();
// while let Some((country, city)) = iter.next() {
// println!("{}, {:?}", str::from_utf8(country).unwrap(), city);
// }
// }
fn populate_store(k: &Rkv, store: &Store<&str>) -> Result<(), StoreError> {
let mut writer = store.write(k)?;
for (country, city) in vec![
("Canada", Value::Str("Ottawa")),
("United States of America", Value::Str("Washington")),
("Germany", Value::Str("Berlin")),
("France", Value::Str("Paris")),
("Italy", Value::Str("Rome")),
("United Kingdom", Value::Str("London")),
("Japan", Value::Str("Tokyo")),
] {
writer.put(country, &city)?;
}
writer.commit()
// fn populate_store(k: &Rkv, store: &Store<&str>) -> Result<(), StoreError> {
// let mut writer = store.write(k)?;
// for (country, city) in vec![
// ("Canada", Value::Str("Ottawa")),
// ("United States of America", Value::Str("Washington")),
// ("Germany", Value::Str("Berlin")),
// ("France", Value::Str("Paris")),
// ("Italy", Value::Str("Rome")),
// ("United Kingdom", Value::Str("London")),
// ("Japan", Value::Str("Tokyo")),
// ] {
// writer.put(country, &city)?;
// }
// writer.commit()
}

@ -7,117 +7,117 @@
//!
//! cargo run --example simple-store
extern crate rkv;
extern crate tempfile;
// extern crate rkv;
// extern crate tempfile;
use rkv::{
Manager,
Rkv,
Store,
Value,
};
use tempfile::Builder;
// use rkv::{
// Manager,
// Rkv,
// Store,
// Value,
// };
// use tempfile::Builder;
use std::fs;
// use std::fs;
fn main() {
let root = Builder::new().prefix("simple-db").tempdir().unwrap();
fs::create_dir_all(root.path()).unwrap();
let p = root.path();
// let root = Builder::new().prefix("simple-db").tempdir().unwrap();
// fs::create_dir_all(root.path()).unwrap();
// let p = root.path();
// The manager enforces that each process opens the same lmdb environment at most once
let created_arc = Manager::singleton().write().unwrap().get_or_create(p, Rkv::new).unwrap();
let k = created_arc.read().unwrap();
// // The manager enforces that each process opens the same lmdb environment at most once
// let created_arc = Manager::singleton().write().unwrap().get_or_create(p, Rkv::new).unwrap();
// let k = created_arc.read().unwrap();
// Creates a store called "store"
let store: Store<&str> = k.open_or_create("store").unwrap();
// // Creates a store called "store"
// let store: Store<&str> = k.open_or_create("store").unwrap();
println!("Inserting data...");
{
// Use a write transaction to mutate the store
let mut writer = store.write(&k).unwrap();
writer.put("int", &Value::I64(1234)).unwrap();
writer.put("uint", &Value::U64(1234_u64)).unwrap();
writer.put("float", &Value::F64(1234.0.into())).unwrap();
writer.put("instant", &Value::Instant(1528318073700)).unwrap();
writer.put("boolean", &Value::Bool(true)).unwrap();
writer.put("string", &Value::Str("héllo, yöu")).unwrap();
writer.put("json", &Value::Json(r#"{"foo":"bar", "number": 1}"#)).unwrap();
writer.put("blob", &Value::Blob(b"blob")).unwrap();
writer.commit().unwrap();
}
// println!("Inserting data...");
// {
// // Use a write transaction to mutate the store
// let mut writer = store.write(&k).unwrap();
// writer.put("int", &Value::I64(1234)).unwrap();
// writer.put("uint", &Value::U64(1234_u64)).unwrap();
// writer.put("float", &Value::F64(1234.0.into())).unwrap();
// writer.put("instant", &Value::Instant(1528318073700)).unwrap();
// writer.put("boolean", &Value::Bool(true)).unwrap();
// writer.put("string", &Value::Str("héllo, yöu")).unwrap();
// writer.put("json", &Value::Json(r#"{"foo":"bar", "number": 1}"#)).unwrap();
// writer.put("blob", &Value::Blob(b"blob")).unwrap();
// writer.commit().unwrap();
// }
println!("Looking up keys...");
{
// Use a read transaction to query the store
let r = &k.read().unwrap();
println!("Get int {:?}", store.get(r, "int").unwrap());
println!("Get uint {:?}", store.get(r, "uint").unwrap());
println!("Get float {:?}", store.get(r, "float").unwrap());
println!("Get instant {:?}", store.get(r, "instant").unwrap());
println!("Get boolean {:?}", store.get(r, "boolean").unwrap());
println!("Get string {:?}", store.get(r, "string").unwrap());
println!("Get json {:?}", store.get(r, "json").unwrap());
println!("Get blob {:?}", store.get(r, "blob").unwrap());
println!("Get non-existent {:?}", store.get(r, "non-existent").unwrap());
}
// println!("Looking up keys...");
// {
// // Use a read transaction to query the store
// let r = &k.read().unwrap();
// println!("Get int {:?}", store.get(r, "int").unwrap());
// println!("Get uint {:?}", store.get(r, "uint").unwrap());
// println!("Get float {:?}", store.get(r, "float").unwrap());
// println!("Get instant {:?}", store.get(r, "instant").unwrap());
// println!("Get boolean {:?}", store.get(r, "boolean").unwrap());
// println!("Get string {:?}", store.get(r, "string").unwrap());
// println!("Get json {:?}", store.get(r, "json").unwrap());
// println!("Get blob {:?}", store.get(r, "blob").unwrap());
// println!("Get non-existent {:?}", store.get(r, "non-existent").unwrap());
// }
println!("Looking up keys via Reader.get()...");
{
// An alternate way to query the store.
let r = store.read(&k).expect("reader");
println!("Get int {:?}", r.get("int").unwrap());
println!("Get uint {:?}", r.get("uint").unwrap());
println!("Get float {:?}", r.get("float").unwrap());
println!("Get instant {:?}", r.get("instant").unwrap());
println!("Get boolean {:?}", r.get("boolean").unwrap());
println!("Get string {:?}", r.get("string").unwrap());
println!("Get json {:?}", r.get("json").unwrap());
println!("Get blob {:?}", r.get("blob").unwrap());
println!("Get non-existent {:?}", r.get("non-existent").unwrap());
}
// println!("Looking up keys via Reader.get()...");
// {
// // An alternate way to query the store.
// let r = store.read(&k).expect("reader");
// println!("Get int {:?}", r.get("int").unwrap());
// println!("Get uint {:?}", r.get("uint").unwrap());
// println!("Get float {:?}", r.get("float").unwrap());
// println!("Get instant {:?}", r.get("instant").unwrap());
// println!("Get boolean {:?}", r.get("boolean").unwrap());
// println!("Get string {:?}", r.get("string").unwrap());
// println!("Get json {:?}", r.get("json").unwrap());
// println!("Get blob {:?}", r.get("blob").unwrap());
// println!("Get non-existent {:?}", r.get("non-existent").unwrap());
// }
println!("Looking up keys via Writer.get()...");
{
let mut writer = store.write(&k).unwrap();
writer.put("foo", &Value::Str("bar")).unwrap();
writer.put("bar", &Value::Str("baz")).unwrap();
writer.delete("foo").unwrap();
println!("It should be None! ({:?})", writer.get("foo").unwrap());
println!("Get bar ({:?})", writer.get("bar").unwrap());
writer.commit().unwrap();
let reader = store.read(&k).expect("reader");
println!("It should be None! ({:?})", reader.get("foo").unwrap());
println!("Get bar {:?}", reader.get("bar").unwrap());
}
// println!("Looking up keys via Writer.get()...");
// {
// let mut writer = store.write(&k).unwrap();
// writer.put("foo", &Value::Str("bar")).unwrap();
// writer.put("bar", &Value::Str("baz")).unwrap();
// writer.delete("foo").unwrap();
// println!("It should be None! ({:?})", writer.get("foo").unwrap());
// println!("Get bar ({:?})", writer.get("bar").unwrap());
// writer.commit().unwrap();
// let reader = store.read(&k).expect("reader");
// println!("It should be None! ({:?})", reader.get("foo").unwrap());
// println!("Get bar {:?}", reader.get("bar").unwrap());
// }
println!("Aborting transaction...");
{
// Aborting a write transaction rollbacks the change(s)
let mut writer = store.write(&k).unwrap();
writer.put("foo", &Value::Str("bar")).unwrap();
writer.abort();
// println!("Aborting transaction...");
// {
// // Aborting a write transaction rollbacks the change(s)
// let mut writer = store.write(&k).unwrap();
// writer.put("foo", &Value::Str("bar")).unwrap();
// writer.abort();
let r = &k.read().unwrap();
println!("It should be None! ({:?})", store.get(r, "foo").unwrap());
// Explicitly aborting a transaction is not required unless an early
// abort is desired, since both read and write transactions will
// implicitly be aborted once they go out of scope.
}
// let r = &k.read().unwrap();
// println!("It should be None! ({:?})", store.get(r, "foo").unwrap());
// // Explicitly aborting a transaction is not required unless an early
// // abort is desired, since both read and write transactions will
// // implicitly be aborted once they go out of scope.
// }
println!("Deleting keys...");
{
// Deleting a key/value also requires a write transaction
let mut writer = store.write(&k).unwrap();
writer.put("foo", &Value::Str("bar")).unwrap();
writer.delete("foo").unwrap();
// Write transaction also supports read
println!("It should be None! ({:?})", writer.get("foo").unwrap());
writer.commit().unwrap();
// println!("Deleting keys...");
// {
// // Deleting a key/value also requires a write transaction
// let mut writer = store.write(&k).unwrap();
// writer.put("foo", &Value::Str("bar")).unwrap();
// writer.delete("foo").unwrap();
// // Write transaction also supports read
// println!("It should be None! ({:?})", writer.get("foo").unwrap());
// writer.commit().unwrap();
// Committing a transaction consumes the writer, preventing you
// from reusing it by failing and reporting a compile-time error.
// This line would report error[E0382]: use of moved value: `writer`.
// writer.put("baz", &Value::Str("buz")).unwrap();
}
// // Committing a transaction consumes the writer, preventing you
// // from reusing it by failing and reporting a compile-time error.
// // This line would report error[E0382]: use of moved value: `writer`.
// // writer.put("baz", &Value::Str("buz")).unwrap();
// }
}

@ -21,18 +21,20 @@ use lmdb::{
DatabaseFlags,
Environment,
EnvironmentBuilder,
RoTransaction,
RwTransaction,
};
use error::StoreError;
use integer::{
IntegerStore,
PrimitiveInt,
};
// use integer::{
// IntegerStore,
// PrimitiveInt,
// };
use readwrite::Store;
use readwrite::{
Reader,
Store,
Writer,
};
pub static DEFAULT_MAX_DBS: c_uint = 5;
@ -85,37 +87,34 @@ impl Rkv {
/// Store creation methods.
impl Rkv {
pub fn open_or_create_default(&self) -> Result<Store<&str>, StoreError> {
pub fn open_or_create_default(&self) -> Result<Store, StoreError> {
self.open_or_create(None)
}
pub fn open_or_create<'s, T, K>(&self, name: T) -> Result<Store<K>, StoreError>
pub fn open_or_create<'s, T>(&self, name: T) -> Result<Store, StoreError>
where
T: Into<Option<&'s str>>,
K: AsRef<[u8]>,
{
let flags = DatabaseFlags::empty();
self.open_or_create_with_flags(name, flags)
}
pub fn open_or_create_integer<'s, T, K>(&self, name: T) -> Result<IntegerStore<K>, StoreError>
where
T: Into<Option<&'s str>>,
K: PrimitiveInt,
{
let mut flags = DatabaseFlags::empty();
flags.toggle(DatabaseFlags::INTEGER_KEY);
let db = self.env.create_db(name.into(), flags).map_err(|e| match e {
lmdb::Error::BadRslot => StoreError::open_during_transaction(),
_ => e.into(),
})?;
Ok(IntegerStore::new(db))
}
pub fn open_or_create_with_flags<'s, T, K>(&self, name: T, flags: DatabaseFlags) -> Result<Store<K>, StoreError>
// pub fn open_or_create_integer<'s, T>(&self, name: T) -> Result<IntegerStore, StoreError>
// where
// T: Into<Option<&'s str>>,
// {
// let mut flags = DatabaseFlags::empty();
// flags.toggle(DatabaseFlags::INTEGER_KEY);
// let db = self.env.create_db(name.into(), flags).map_err(|e| match e {
// lmdb::Error::BadRslot => StoreError::open_during_transaction(),
// _ => e.into(),
// })?;
// Ok(IntegerStore::new(db))
// }
pub fn open_or_create_with_flags<'s, T>(&self, name: T, flags: DatabaseFlags) -> Result<Store, StoreError>
where
T: Into<Option<&'s str>>,
K: AsRef<[u8]>,
{
let db = self.env.create_db(name.into(), flags).map_err(|e| match e {
lmdb::Error::BadRslot => StoreError::open_during_transaction(),
@ -143,12 +142,20 @@ impl Rkv {
/// Read and write accessors.
impl Rkv {
pub fn read(&self) -> Result<RoTransaction, lmdb::Error> {
self.env.begin_ro_txn()
pub fn read<K>(&self) -> Result<Reader<K>, StoreError>
where
K: AsRef<[u8]>,
{
let txn = self.env.begin_ro_txn()?;
Ok(Reader::new(txn))
}
pub fn write(&self) -> Result<RwTransaction, lmdb::Error> {
self.env.begin_rw_txn()
pub fn write<K>(&self) -> Result<Writer<K>, StoreError>
where
K: AsRef<[u8]>,
{
let txn = self.env.begin_rw_txn()?;
Ok(Writer::new(txn))
}
}
@ -200,10 +207,10 @@ mod tests {
let k = Rkv::new(root.path()).expect("new succeeded");
let _ = k.open_or_create_default().expect("created default");
let yyy: Store<&str> = k.open_or_create("yyy").expect("opened");
let reader = yyy.read(&k).expect("reader");
let yyy = k.open_or_create("yyy").expect("opened");
let reader = k.read::<&str>().expect("reader");
let result = reader.get("foo");
let result = reader.get(&yyy, "foo");
assert_eq!(None, result.expect("success but no value"));
}
@ -213,98 +220,98 @@ mod tests {
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: Store<&str> = k.open_or_create("sk").expect("opened");
let sk: Store = k.open_or_create("sk").expect("opened");
{
let mut writer = sk.write(&k).expect("writer");
writer.put("foo", &Value::I64(1234)).expect("wrote");
writer.put("noo", &Value::F64(1234.0.into())).expect("wrote");
writer.put("bar", &Value::Bool(true)).expect("wrote");
writer.put("baz", &Value::Str("héllo, yöu")).expect("wrote");
assert_eq!(writer.get("foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(writer.get("noo").expect("read"), Some(Value::F64(1234.0.into())));
assert_eq!(writer.get("bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(writer.get("baz").expect("read"), Some(Value::Str("héllo, yöu")));
let mut writer = k.write::<&str>().expect("writer");
writer.put(&sk, "foo", &Value::I64(1234)).expect("wrote");
writer.put(&sk, "noo", &Value::F64(1234.0.into())).expect("wrote");
writer.put(&sk, "bar", &Value::Bool(true)).expect("wrote");
writer.put(&sk, "baz", &Value::Str("héllo, yöu")).expect("wrote");
assert_eq!(writer.get(&sk, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(writer.get(&sk, "noo").expect("read"), Some(Value::F64(1234.0.into())));
assert_eq!(writer.get(&sk, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(writer.get(&sk, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
// Isolation. Reads won't return values.
let r = &k.read().unwrap();
assert_eq!(sk.get(r, "foo").expect("read"), None);
assert_eq!(sk.get(r, "bar").expect("read"), None);
assert_eq!(sk.get(r, "baz").expect("read"), None);
let r = &k.read::<&str>().unwrap();
assert_eq!(r.get(&sk, "foo").expect("read"), None);
assert_eq!(r.get(&sk, "bar").expect("read"), None);
assert_eq!(r.get(&sk, "baz").expect("read"), None);
}
// Dropped: tx rollback. Reads will still return nothing.
{
let r = &k.read().unwrap();
assert_eq!(sk.get(r, "foo").expect("read"), None);
assert_eq!(sk.get(r, "bar").expect("read"), None);
assert_eq!(sk.get(r, "baz").expect("read"), None);
let r = &k.read::<&str>().unwrap();
assert_eq!(r.get(&sk, "foo").expect("read"), None);
assert_eq!(r.get(&sk, "bar").expect("read"), None);
assert_eq!(r.get(&sk, "baz").expect("read"), None);
}
{
let mut writer = sk.write(&k).expect("writer");
writer.put("foo", &Value::I64(1234)).expect("wrote");
writer.put("bar", &Value::Bool(true)).expect("wrote");
writer.put("baz", &Value::Str("héllo, yöu")).expect("wrote");
assert_eq!(writer.get("foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(writer.get("bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(writer.get("baz").expect("read"), Some(Value::Str("héllo, yöu")));
let mut writer = k.write::<&str>().expect("writer");
writer.put(&sk, "foo", &Value::I64(1234)).expect("wrote");
writer.put(&sk, "bar", &Value::Bool(true)).expect("wrote");
writer.put(&sk, "baz", &Value::Str("héllo, yöu")).expect("wrote");
assert_eq!(writer.get(&sk, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(writer.get(&sk, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(writer.get(&sk, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
writer.commit().expect("committed");
}
// Committed. Reads will succeed.
{
let r = &k.read().unwrap();
assert_eq!(sk.get(r, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(sk.get(r, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(sk.get(r, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
let r = k.read::<&str>().unwrap();
assert_eq!(r.get(&sk, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(r.get(&sk, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(r.get(&sk, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
}
{
let mut writer = sk.write(&k).expect("writer");
writer.delete("foo").expect("deleted");
writer.delete("bar").expect("deleted");
writer.delete("baz").expect("deleted");
assert_eq!(writer.get("foo").expect("read"), None);
assert_eq!(writer.get("bar").expect("read"), None);
assert_eq!(writer.get("baz").expect("read"), None);
let mut writer = k.write::<&str>().expect("writer");
writer.delete(&sk, "foo").expect("deleted");
writer.delete(&sk, "bar").expect("deleted");
writer.delete(&sk, "baz").expect("deleted");
assert_eq!(writer.get(&sk, "foo").expect("read"), None);
assert_eq!(writer.get(&sk, "bar").expect("read"), None);
assert_eq!(writer.get(&sk, "baz").expect("read"), None);
// Isolation. Reads still return values.
let r = &k.read().unwrap();
assert_eq!(sk.get(r, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(sk.get(r, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(sk.get(r, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
let r = k.read::<&str>().unwrap();
assert_eq!(r.get(&sk, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(r.get(&sk, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(r.get(&sk, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
}
// Dropped: tx rollback. Reads will still return values.
{
let r = &k.read().unwrap();
assert_eq!(sk.get(r, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(sk.get(r, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(sk.get(r, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
let r = k.read::<&str>().unwrap();
assert_eq!(r.get(&sk, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(r.get(&sk, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(r.get(&sk, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
}
{
let mut writer = sk.write(&k).expect("writer");
writer.delete("foo").expect("deleted");
writer.delete("bar").expect("deleted");
writer.delete("baz").expect("deleted");
assert_eq!(writer.get("foo").expect("read"), None);
assert_eq!(writer.get("bar").expect("read"), None);
assert_eq!(writer.get("baz").expect("read"), None);
let mut writer = k.write::<&str>().expect("writer");
writer.delete(&sk, "foo").expect("deleted");
writer.delete(&sk, "bar").expect("deleted");
writer.delete(&sk, "baz").expect("deleted");
assert_eq!(writer.get(&sk, "foo").expect("read"), None);
assert_eq!(writer.get(&sk, "bar").expect("read"), None);
assert_eq!(writer.get(&sk, "baz").expect("read"), None);
writer.commit().expect("committed");
}
// Committed. Reads will succeed but return None to indicate a missing value.
{
let r = &k.read().unwrap();
assert_eq!(sk.get(r, "foo").expect("read"), None);
assert_eq!(sk.get(r, "bar").expect("read"), None);
assert_eq!(sk.get(r, "baz").expect("read"), None);
let r = k.read::<&str>().unwrap();
assert_eq!(r.get(&sk, "foo").expect("read"), None);
assert_eq!(r.get(&sk, "bar").expect("read"), None);
assert_eq!(r.get(&sk, "baz").expect("read"), None);
}
}
@ -358,26 +365,26 @@ mod tests {
let root = Builder::new().prefix("test_read_before_write_num").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: Store<&str> = k.open_or_create("sk").expect("opened");
let sk: Store = k.open_or_create("sk").expect("opened");
// Test reading a number, modifying it, and then writing it back.
// We have to be done with the Value::I64 before calling Writer::put,
// as the Value::I64 borrows an immutable reference to the Writer.
// So we extract and copy its primitive value.
fn get_existing_foo(writer: &Writer<&str>) -> Option<i64> {
match writer.get("foo").expect("read") {
fn get_existing_foo(writer: &Writer<&str>, store: &Store) -> Option<i64> {
match writer.get(store, "foo").expect("read") {
Some(Value::I64(val)) => Some(val),
_ => None,
}
}
let mut writer = sk.write(&k).expect("writer");
let mut existing = get_existing_foo(&writer).unwrap_or(99);
let mut writer = k.write::<&str>().expect("writer");
let mut existing = get_existing_foo(&writer, &sk).unwrap_or(99);
existing += 1;
writer.put("foo", &Value::I64(existing)).expect("success");
writer.put(&sk, "foo", &Value::I64(existing)).expect("success");
let updated = get_existing_foo(&writer).unwrap_or(99);
let updated = get_existing_foo(&writer, &sk).unwrap_or(99);
assert_eq!(updated, 100);
writer.commit().expect("commit");
}
@ -387,20 +394,20 @@ mod tests {
let root = Builder::new().prefix("test_read_before_write_str").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: Store<&str> = k.open_or_create("sk").expect("opened");
let sk: Store = k.open_or_create("sk").expect("opened");
// Test reading a string, modifying it, and then writing it back.
// We have to be done with the Value::Str before calling Writer::put,
// as the Value::Str (and its underlying &str) borrows an immutable
// reference to the Writer. So we copy it to a String.
let mut writer = sk.write(&k).expect("writer");
let mut existing = match writer.get("foo").expect("read") {
let mut writer = k.write::<&str>().expect("writer");
let mut existing = match writer.get(&sk, "foo").expect("read") {
Some(Value::Str(val)) => val,
_ => "",
}.to_string();
existing.push('…');
writer.put("foo", &Value::Str(&existing)).expect("write");
writer.put(&sk, "foo", &Value::Str(&existing)).expect("write");
writer.commit().expect("commit");
}
@ -409,10 +416,9 @@ mod tests {
let root = Builder::new().prefix("test_concurrent_reads_prohibited").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let s: Store<&str> = k.open_or_create("s").expect("opened");
let _first = s.read(&k).expect("reader");
let second = s.read(&k);
let _first = k.read::<&str>().expect("reader");
let second = k.read::<&str>();
match second {
Err(StoreError::ReadTransactionAlreadyExists(t)) => {
@ -429,46 +435,41 @@ mod tests {
let root = Builder::new().prefix("test_isolation").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let s: Store<&str> = k.open_or_create("s").expect("opened");
let s: Store = k.open_or_create("s").expect("opened");
// Add one field.
{
let mut writer = s.write(&k).expect("writer");
writer.put("foo", &Value::I64(1234)).expect("wrote");
let mut writer = k.write::<&str>().expect("writer");
writer.put(&s, "foo", &Value::I64(1234)).expect("wrote");
writer.commit().expect("committed");
}
// Both ways of reading see the value.
{
let reader = &k.read().unwrap();
assert_eq!(s.get(reader, "foo").expect("read"), Some(Value::I64(1234)));
}
{
let reader = s.read(&k).unwrap();
assert_eq!(reader.get("foo").expect("read"), Some(Value::I64(1234)));
let reader = k.read::<&str>().unwrap();
assert_eq!(reader.get(&s, "foo").expect("read"), Some(Value::I64(1234)));
}
// Establish a long-lived reader that outlasts a writer.
let reader = s.read(&k).expect("reader");
assert_eq!(reader.get("foo").expect("read"), Some(Value::I64(1234)));
let reader = k.read::<&str>().expect("reader");
assert_eq!(reader.get(&s, "foo").expect("read"), Some(Value::I64(1234)));
// Start a write transaction.
let mut writer = s.write(&k).expect("writer");
writer.put("foo", &Value::I64(999)).expect("wrote");
let mut writer = k.write::<&str>().expect("writer");
writer.put(&s, "foo", &Value::I64(999)).expect("wrote");
// The reader and writer are isolated.
assert_eq!(reader.get("foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(writer.get("foo").expect("read"), Some(Value::I64(999)));
assert_eq!(reader.get(&s, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(writer.get(&s, "foo").expect("read"), Some(Value::I64(999)));
// If we commit the writer, we still have isolation.
writer.commit().expect("committed");
assert_eq!(reader.get("foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(reader.get(&s, "foo").expect("read"), Some(Value::I64(1234)));
// A new reader sees the committed value. Note that LMDB doesn't allow two
// read transactions to exist in the same thread, so we abort the previous one.
reader.abort();
let reader = s.read(&k).expect("reader");
assert_eq!(reader.get("foo").expect("read"), Some(Value::I64(999)));
let reader = k.read::<&str>().expect("reader");
assert_eq!(reader.get(&s, "foo").expect("read"), Some(Value::I64(999)));
}
#[test]
@ -476,12 +477,12 @@ mod tests {
let root = Builder::new().prefix("test_round_trip_blob").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: Store<&str> = k.open_or_create("sk").expect("opened");
let mut writer = sk.write(&k).expect("writer");
let sk: Store = k.open_or_create("sk").expect("opened");
let mut writer = k.write::<&str>().expect("writer");
assert_eq!(writer.get("foo").expect("read"), None);
writer.put("foo", &Value::Blob(&[1, 2, 3, 4])).expect("wrote");
assert_eq!(writer.get("foo").expect("read"), Some(Value::Blob(&[1, 2, 3, 4])));
assert_eq!(writer.get(&sk, "foo").expect("read"), None);
writer.put(&sk, "foo", &Value::Blob(&[1, 2, 3, 4])).expect("wrote");
assert_eq!(writer.get(&sk, "foo").expect("read"), Some(Value::Blob(&[1, 2, 3, 4])));
fn u16_to_u8(src: &[u16]) -> Vec<u8> {
let mut dst = vec![0; 2 * src.len()];
@ -499,9 +500,9 @@ mod tests {
// their [u16] backing storage to [u8]. Test that converting, writing,
// reading, and converting back works as expected.
let u16_array = [1000, 10000, 54321, 65535];
assert_eq!(writer.get("bar").expect("read"), None);
writer.put("bar", &Value::Blob(&u16_to_u8(&u16_array))).expect("wrote");
let u8_array = match writer.get("bar").expect("read") {
assert_eq!(writer.get(&sk, "bar").expect("read"), None);
writer.put(&sk, "bar", &Value::Blob(&u16_to_u8(&u16_array))).expect("wrote");
let u8_array = match writer.get(&sk, "bar").expect("read") {
Some(Value::Blob(val)) => val,
_ => &[],
};
@ -514,12 +515,12 @@ mod tests {
let root = Builder::new().prefix("test_delete_value").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: Store<&str> = k.open_or_create_with_flags("sk", DatabaseFlags::DUP_SORT).expect("opened");
let sk: Store = k.open_or_create_with_flags("sk", DatabaseFlags::DUP_SORT).expect("opened");
let mut writer = sk.write(&k).expect("writer");
writer.put("foo", &Value::I64(1234)).expect("wrote");
writer.put("foo", &Value::I64(1235)).expect("wrote");
writer.delete_value("foo", &Value::I64(1234)).expect("deleted");
let mut writer = k.write::<&str>().expect("writer");
writer.put(&sk, "foo", &Value::I64(1234)).expect("wrote");
writer.put(&sk, "foo", &Value::I64(1235)).expect("wrote");
writer.delete_value(&sk, "foo", &Value::I64(1234)).expect("deleted");
}
#[test]
@ -527,28 +528,28 @@ mod tests {
let root = Builder::new().prefix("test_iter").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: Store<&str> = k.open_or_create("sk").expect("opened");
let sk: Store = k.open_or_create("sk").expect("opened");
// An iterator over an empty store returns no values.
{
let reader = sk.read(&k).unwrap();
let mut iter = reader.iter_start().unwrap();
let reader = k.read::<&str>().unwrap();
let mut iter = reader.iter_start(&sk).unwrap();
assert!(iter.next().is_none());
}
let mut writer = sk.write(&k).expect("writer");
writer.put("foo", &Value::I64(1234)).expect("wrote");
writer.put("noo", &Value::F64(1234.0.into())).expect("wrote");
writer.put("bar", &Value::Bool(true)).expect("wrote");
writer.put("baz", &Value::Str("héllo, yöu")).expect("wrote");
writer.put("héllò, töűrîst", &Value::Str("Emil.RuleZ!")).expect("wrote");
writer.put("你好,遊客", &Value::Str("米克規則")).expect("wrote");
let mut writer = k.write::<&str>().expect("writer");
writer.put(&sk, "foo", &Value::I64(1234)).expect("wrote");
writer.put(&sk, "noo", &Value::F64(1234.0.into())).expect("wrote");
writer.put(&sk, "bar", &Value::Bool(true)).expect("wrote");
writer.put(&sk, "baz", &Value::Str("héllo, yöu")).expect("wrote");
writer.put(&sk, "héllò, töűrîst", &Value::Str("Emil.RuleZ!")).expect("wrote");
writer.put(&sk, "你好,遊客", &Value::Str("米克規則")).expect("wrote");
writer.commit().expect("committed");
let reader = sk.read(&k).unwrap();
let reader = k.read::<&str>().unwrap();
// Reader.iter() returns (key, value) tuples ordered by key.
let mut iter = reader.iter_start().unwrap();
let mut iter = reader.iter_start(&sk).unwrap();
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "bar");
assert_eq!(val.expect("value"), Some(Value::Bool(true)));
@ -575,7 +576,7 @@ mod tests {
// Reader.iter_from() begins iteration at the first key equal to
// or greater than the given key.
let mut iter = reader.iter_from("moo").unwrap();
let mut iter = reader.iter_from(&sk, "moo").unwrap();
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "noo");
assert_eq!(val.expect("value"), Some(Value::F64(1234.0.into())));
@ -586,7 +587,7 @@ mod tests {
// Reader.iter_from() works as expected when the given key is a prefix
// of a key in the store.
let mut iter = reader.iter_from("no").unwrap();
let mut iter = reader.iter_from(&sk, "no").unwrap();
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "noo");
assert_eq!(val.expect("value"), Some(Value::F64(1234.0.into())));
@ -602,16 +603,16 @@ mod tests {
let root = Builder::new().prefix("test_iter_from_key_greater_than_existing").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: Store<&str> = k.open_or_create("sk").expect("opened");
let sk: Store = k.open_or_create("sk").expect("opened");
let mut writer = sk.write(&k).expect("writer");
writer.put("foo", &Value::I64(1234)).expect("wrote");
writer.put("noo", &Value::F64(1234.0.into())).expect("wrote");
writer.put("bar", &Value::Bool(true)).expect("wrote");
writer.put("baz", &Value::Str("héllo, yöu")).expect("wrote");
let mut writer = k.write::<&str>().expect("writer");
writer.put(&sk, "foo", &Value::I64(1234)).expect("wrote");
writer.put(&sk, "noo", &Value::F64(1234.0.into())).expect("wrote");
writer.put(&sk, "bar", &Value::Bool(true)).expect("wrote");
writer.put(&sk, "baz", &Value::Str("héllo, yöu")).expect("wrote");
writer.commit().expect("committed");
let reader = sk.read(&k).unwrap();
let reader = k.read::<&str>().unwrap();
// There is no key greater than "nuu", so the underlying LMDB API panics
// when calling iter_from. This is unfortunate, and I've requested
@ -620,6 +621,158 @@ mod tests {
//
// Also see alternative https://github.com/danburkert/lmdb-rs/pull/30.
//
reader.iter_from("nuu").unwrap();
reader.iter_from(&sk, "nuu").unwrap();
}
#[test]
fn test_mutilpe_store_read_write() {
let root = Builder::new().prefix("test_mutilpe_store_read_write").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let s1: Store = k.open_or_create("store_1").expect("opened");
let s2: Store = k.open_or_create("store_2").expect("opened");
let s3: Store = k.open_or_create("store_3").expect("opened");
let mut writer = k.write::<&str>().expect("writer");
writer.put(&s1, "foo", &Value::Str("bar")).expect("wrote");
writer.put(&s2, "foo", &Value::I64(123)).expect("wrote");
writer.put(&s3, "foo", &Value::Bool(true)).expect("wrote");
assert_eq!(writer.get(&s1, "foo").expect("read"), Some(Value::Str("bar")));
assert_eq!(writer.get(&s2, "foo").expect("read"), Some(Value::I64(123)));
assert_eq!(writer.get(&s3, "foo").expect("read"), Some(Value::Bool(true)));
writer.commit().expect("committed");
let reader = k.read::<&str>().expect("unbound_reader");
assert_eq!(reader.get(&s1, "foo").expect("read"), Some(Value::Str("bar")));
assert_eq!(reader.get(&s2, "foo").expect("read"), Some(Value::I64(123)));
assert_eq!(reader.get(&s3, "foo").expect("read"), Some(Value::Bool(true)));
reader.abort();
// test delete across multiple stores
let mut writer = k.write::<&str>().expect("writer");
writer.delete(&s1, "foo").expect("deleted");
writer.delete(&s2, "foo").expect("deleted");
writer.delete(&s3, "foo").expect("deleted");
writer.commit().expect("committed");
let reader = k.read::<&str>().expect("reader");
assert_eq!(reader.get(&s1, "key").expect("value"), None);
assert_eq!(reader.get(&s2, "key").expect("value"), None);
assert_eq!(reader.get(&s3, "key").expect("value"), None);
}
#[test]
fn test_multiple_store_iter() {
let root = Builder::new().prefix("test_multiple_store_iter").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let s1: Store = k.open_or_create("store_1").expect("opened");
let s2: Store = k.open_or_create("store_2").expect("opened");
let mut writer = k.write::<&str>().expect("writer");
// Write to "s1"
writer.put(&s1, "foo", &Value::I64(1234)).expect("wrote");
writer.put(&s1, "noo", &Value::F64(1234.0.into())).expect("wrote");
writer.put(&s1, "bar", &Value::Bool(true)).expect("wrote");
writer.put(&s1, "baz", &Value::Str("héllo, yöu")).expect("wrote");
writer.put(&s1, "héllò, töűrîst", &Value::Str("Emil.RuleZ!")).expect("wrote");
writer.put(&s1, "你好,遊客", &Value::Str("米克規則")).expect("wrote");
// Writer to "s2"
writer.put(&s2, "foo", &Value::I64(1234)).expect("wrote");
writer.put(&s2, "noo", &Value::F64(1234.0.into())).expect("wrote");
writer.put(&s2, "bar", &Value::Bool(true)).expect("wrote");
writer.put(&s2, "baz", &Value::Str("héllo, yöu")).expect("wrote");
writer.put(&s2, "héllò, töűrîst", &Value::Str("Emil.RuleZ!")).expect("wrote");
writer.put(&s2, "你好,遊客", &Value::Str("米克規則")).expect("wrote");
writer.commit().expect("committed");
let reader = k.read::<&str>().unwrap();
// Iterate through the whole store in "s1"
let mut iter = reader.iter_start(&s1).unwrap();
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "bar");
assert_eq!(val.expect("value"), Some(Value::Bool(true)));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "baz");
assert_eq!(val.expect("value"), Some(Value::Str("héllo, yöu")));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "foo");
assert_eq!(val.expect("value"), Some(Value::I64(1234)));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "héllò, töűrîst");
assert_eq!(val.expect("value"), Some(Value::Str("Emil.RuleZ!")));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "noo");
assert_eq!(val.expect("value"), Some(Value::F64(1234.0.into())));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "你好,遊客");
assert_eq!(val.expect("value"), Some(Value::Str("米克規則")));
assert!(iter.next().is_none());
// Iterate through the whole store in "s2"
let mut iter = reader.iter_start(&s2).unwrap();
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "bar");
assert_eq!(val.expect("value"), Some(Value::Bool(true)));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "baz");
assert_eq!(val.expect("value"), Some(Value::Str("héllo, yöu")));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "foo");
assert_eq!(val.expect("value"), Some(Value::I64(1234)));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "héllò, töűrîst");
assert_eq!(val.expect("value"), Some(Value::Str("Emil.RuleZ!")));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "noo");
assert_eq!(val.expect("value"), Some(Value::F64(1234.0.into())));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "你好,遊客");
assert_eq!(val.expect("value"), Some(Value::Str("米克規則")));
assert!(iter.next().is_none());
// Iterate from a given key in "s1"
let mut iter = reader.iter_from(&s1, "moo").unwrap();
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "noo");
assert_eq!(val.expect("value"), Some(Value::F64(1234.0.into())));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "你好,遊客");
assert_eq!(val.expect("value"), Some(Value::Str("米克規則")));
assert!(iter.next().is_none());
// Iterate from a given key in "s2"
let mut iter = reader.iter_from(&s2, "moo").unwrap();
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "noo");
assert_eq!(val.expect("value"), Some(Value::F64(1234.0.into())));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "你好,遊客");
assert_eq!(val.expect("value"), Some(Value::Str("米克規則")));
assert!(iter.next().is_none());
// Iterate from a given prefix in "s1"
let mut iter = reader.iter_from(&s1, "no").unwrap();
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "noo");
assert_eq!(val.expect("value"), Some(Value::F64(1234.0.into())));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "你好,遊客");
assert_eq!(val.expect("value"), Some(Value::Str("米克規則")));
assert!(iter.next().is_none());
// Iterate from a given prefix in "s2"
let mut iter = reader.iter_from(&s2, "no").unwrap();
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "noo");
assert_eq!(val.expect("value"), Some(Value::F64(1234.0.into())));
let (key, val) = iter.next().unwrap();
assert_eq!(str::from_utf8(key).expect("key"), "你好,遊客");
assert_eq!(val.expect("value"), Some(Value::Str("米克規則")));
assert!(iter.next().is_none());
}
}

@ -8,174 +8,174 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::marker::PhantomData;
use bincode::serialize;
use lmdb::{
Database,
RoTransaction,
};
use serde::Serialize;
use error::{
DataError,
StoreError,
};
use value::Value;
use readwrite::{
Reader,
Store,
Writer,
};
use Rkv;
pub trait EncodableKey {
fn to_bytes(&self) -> Result<Vec<u8>, DataError>;
}
pub trait PrimitiveInt: EncodableKey {}
impl PrimitiveInt for u32 {}
impl<T> EncodableKey for T
where
T: Serialize,
{
fn to_bytes(&self) -> Result<Vec<u8>, DataError> {
serialize(self) // TODO: limited key length.
.map_err(|e| e.into())
}
}
struct Key<K> {
bytes: Vec<u8>,
phantom: PhantomData<K>,
}
impl<K> AsRef<[u8]> for Key<K>
where
K: EncodableKey,
{
fn as_ref(&self) -> &[u8] {
self.bytes.as_ref()
}
}
impl<K> Key<K>
where
K: EncodableKey,
{
fn new(k: K) -> Result<Key<K>, DataError> {
Ok(Key {
bytes: k.to_bytes()?,
phantom: PhantomData,
})
}
}
pub struct IntegerStore<K>
where
K: PrimitiveInt,
{
inner: Store<Key<K>>,
}
pub struct IntegerReader<'env, K>
where
K: PrimitiveInt,
{
inner: Reader<'env, Key<K>>,
}
impl<'env, K> IntegerReader<'env, K>
where
K: PrimitiveInt,
{
pub fn get<'s>(&'s self, k: K) -> Result<Option<Value<'s>>, StoreError> {
self.inner.get(Key::new(k)?)
}
pub fn abort(self) {
self.inner.abort();
}
}
pub struct IntegerWriter<'env, K>
where
K: PrimitiveInt,
{
inner: Writer<'env, Key<K>>,
}
impl<'env, K> IntegerWriter<'env, K>
where
K: PrimitiveInt,
{
pub fn get<'s>(&'s self, k: K) -> Result<Option<Value<'s>>, StoreError> {
self.inner.get(Key::new(k)?)
}
pub fn put<'s>(&'s mut self, k: K, v: &Value) -> Result<(), StoreError> {
self.inner.put(Key::new(k)?, v)
}
fn abort(self) {
self.inner.abort();
}
}
impl<K> IntegerStore<K>
where
K: PrimitiveInt,
{
pub fn new(db: Database) -> IntegerStore<K> {
IntegerStore {
inner: Store::new(db),
}
}
pub fn read<'env>(&self, env: &'env Rkv) -> Result<IntegerReader<'env, K>, StoreError> {
Ok(IntegerReader {
inner: self.inner.read(env)?,
})
}
pub fn write<'env>(&mut self, env: &'env Rkv) -> Result<IntegerWriter<'env, K>, StoreError> {
Ok(IntegerWriter {
inner: self.inner.write(env)?,
})
}
pub fn get<'env, 'tx>(&self, tx: &'tx RoTransaction<'env>, k: K) -> Result<Option<Value<'tx>>, StoreError> {
let key = Key::new(k)?;
self.inner.get(tx, key)
}
}
#[cfg(test)]
mod tests {
extern crate tempfile;
use self::tempfile::Builder;
use std::fs;
use super::*;
#[test]
fn test_integer_keys() {
let root = Builder::new().prefix("test_integer_keys").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let mut s: IntegerStore<u32> = k.open_or_create_integer("s").expect("open");
let mut writer = s.write(&k).expect("writer");
writer.put(123, &Value::Str("hello!")).expect("write");
assert_eq!(writer.get(123).expect("read"), Some(Value::Str("hello!")));
}
}
// use std::marker::PhantomData;
// use bincode::serialize;
// use lmdb::{
// Database,
// RoTransaction,
// };
// use serde::Serialize;
// use error::{
// DataError,
// StoreError,
// };
// use value::Value;
// use readwrite::{
// Reader,
// Store,
// Writer,
// };
// use Rkv;
// pub trait EncodableKey {
// fn to_bytes(&self) -> Result<Vec<u8>, DataError>;
// }
// pub trait PrimitiveInt: EncodableKey {}
// impl PrimitiveInt for u32 {}
// impl<T> EncodableKey for T
// where
// T: Serialize,
// {
// fn to_bytes(&self) -> Result<Vec<u8>, DataError> {
// serialize(self) // TODO: limited key length.
// .map_err(|e| e.into())
// }
// }
// struct Key<K> {
// bytes: Vec<u8>,
// phantom: PhantomData<K>,
// }
// impl<K> AsRef<[u8]> for Key<K>
// where
// K: EncodableKey,
// {
// fn as_ref(&self) -> &[u8] {
// self.bytes.as_ref()
// }
// }
// impl<K> Key<K>
// where
// K: EncodableKey,
// {
// fn new(k: K) -> Result<Key<K>, DataError> {
// Ok(Key {
// bytes: k.to_bytes()?,
// phantom: PhantomData,
// })
// }
// }
// pub struct IntegerStore<K>
// where
// K: PrimitiveInt,
// {
// inner: Store<Key<K>>,
// }
// pub struct IntegerReader<'env, K>
// where
// K: PrimitiveInt,
// {
// inner: Reader<'env, Key<K>>,
// }
// impl<'env, K> IntegerReader<'env, K>
// where
// K: PrimitiveInt,
// {
// pub fn get<'s>(&'s self, k: K) -> Result<Option<Value<'s>>, StoreError> {
// self.inner.get(Key::new(k)?)
// }
// pub fn abort(self) {
// self.inner.abort();
// }
// }
// pub struct IntegerWriter<'env, K>
// where
// K: PrimitiveInt,
// {
// inner: Writer<'env, Key<K>>,
// }
// impl<'env, K> IntegerWriter<'env, K>
// where
// K: PrimitiveInt,
// {
// pub fn get<'s>(&'s self, k: K) -> Result<Option<Value<'s>>, StoreError> {
// self.inner.get(Key::new(k)?)
// }
// pub fn put<'s>(&'s mut self, k: K, v: &Value) -> Result<(), StoreError> {
// self.inner.put(Key::new(k)?, v)
// }
// fn abort(self) {
// self.inner.abort();
// }
// }
// impl<K> IntegerStore<K>
// where
// K: PrimitiveInt,
// {
// pub fn new(db: Database) -> IntegerStore<K> {
// IntegerStore {
// inner: Store::new(db),
// }
// }
// pub fn read<'env>(&self, env: &'env Rkv) -> Result<IntegerReader<'env, K>, StoreError> {
// Ok(IntegerReader {
// inner: self.inner.read(env)?,
// })
// }
// pub fn write<'env>(&mut self, env: &'env Rkv) -> Result<IntegerWriter<'env, K>, StoreError> {
// Ok(IntegerWriter {
// inner: self.inner.write(env)?,
// })
// }
// pub fn get<'env, 'tx>(&self, tx: &'tx RoTransaction<'env>, k: K) -> Result<Option<Value<'tx>>, StoreError> {
// let key = Key::new(k)?;
// self.inner.get(tx, key)
// }
// }
// #[cfg(test)]
// mod tests {
// extern crate tempfile;
// use self::tempfile::Builder;
// use std::fs;
// use super::*;
// #[test]
// fn test_integer_keys() {
// let root = Builder::new().prefix("test_integer_keys").tempdir().expect("tempdir");
// fs::create_dir_all(root.path()).expect("dir created");
// let k = Rkv::new(root.path()).expect("new succeeded");
// let mut s: IntegerStore<u32> = k.open_or_create_integer("s").expect("open");
// let mut writer = s.write(&k).expect("writer");
// writer.put(123, &Value::Str("hello!")).expect("write");
// assert_eq!(writer.get(123).expect("read"), Some(Value::Str("hello!")));
// }
// }

@ -67,25 +67,26 @@
//!
//! // Call `Rkv.open_or_create_default()` to get a handle to the default
//! // (unnamed) store for the environment.
//! let store: Store<&str> = env.open_or_create_default().unwrap();
//! let store: Store = env.open_or_create_default().unwrap();
//!
//! {
//! // Use a write transaction to mutate the store by calling
//! // `Store.write()` to create a `Writer`. There can be only one
//! // `Rkv.write()` to create a `Writer`. There can be only one
//! // writer for a given store; opening a second one will block
//! // until the first completes.
//! let mut writer = store.write(&env).unwrap();
//! let mut writer = env.write::<&str>().unwrap();
//!
//! // Writer takes a `Store` reference as the first argument.
//! // Keys are `AsRef<[u8]>`, while values are `Value` enum instances.
//! // Use the `Blob` variant to store arbitrary collections of bytes.
//! writer.put("int", &Value::I64(1234)).unwrap();
//! writer.put("uint", &Value::U64(1234_u64)).unwrap();
//! writer.put("float", &Value::F64(1234.0.into())).unwrap();
//! writer.put("instant", &Value::Instant(1528318073700)).unwrap();
//! writer.put("boolean", &Value::Bool(true)).unwrap();
//! writer.put("string", &Value::Str("héllo, yöu")).unwrap();
//! writer.put("json", &Value::Json(r#"{"foo":"bar", "number": 1}"#)).unwrap();
//! writer.put("blob", &Value::Blob(b"blob")).unwrap();
//! writer.put(&store, "int", &Value::I64(1234)).unwrap();
//! writer.put(&store, "uint", &Value::U64(1234_u64)).unwrap();
//! writer.put(&store, "float", &Value::F64(1234.0.into())).unwrap();
//! writer.put(&store, "instant", &Value::Instant(1528318073700)).unwrap();
//! writer.put(&store, "boolean", &Value::Bool(true)).unwrap();
//! writer.put(&store, "string", &Value::Str("héllo, yöu")).unwrap();
//! writer.put(&store, "json", &Value::Json(r#"{"foo":"bar", "number": 1}"#)).unwrap();
//! writer.put(&store, "blob", &Value::Blob(b"blob")).unwrap();
//!
//! // You must commit a write transaction before the writer goes out
//! // of scope, or the transaction will abort and the data won't persist.
@ -93,24 +94,24 @@
//! }
//!
//! {
//! // Use a read transaction to query the store by calling `Store.read()`
//! // Use a read transaction to query the store by calling `Rkv.read()`
//! // to create a `Reader`. There can be unlimited concurrent readers
//! // for a store, and readers never block on a writer nor other readers.
//! let reader = store.read(&env).expect("reader");
//!
//! // To retrieve data, call `Reader.get()`, passing it the key
//! // for the value to retrieve.
//! println!("Get int {:?}", reader.get("int").unwrap());
//! println!("Get uint {:?}", reader.get("uint").unwrap());
//! println!("Get float {:?}", reader.get("float").unwrap());
//! println!("Get instant {:?}", reader.get("instant").unwrap());
//! println!("Get boolean {:?}", reader.get("boolean").unwrap());
//! println!("Get string {:?}", reader.get("string").unwrap());
//! println!("Get json {:?}", reader.get("json").unwrap());
//! println!("Get blob {:?}", reader.get("blob").unwrap());
//! let reader = env.read::<&str>().expect("reader");
//!
//! // To retrieve data, call `Reader.get()`, passing it the target store
//! // and the key for the value to retrieve.
//! println!("Get int {:?}", reader.get(&store, "int").unwrap());
//! println!("Get uint {:?}", reader.get(&store, "uint").unwrap());
//! println!("Get float {:?}", reader.get(&store, "float").unwrap());
//! println!("Get instant {:?}", reader.get(&store, "instant").unwrap());
//! println!("Get boolean {:?}", reader.get(&store, "boolean").unwrap());
//! println!("Get string {:?}", reader.get(&store, "string").unwrap());
//! println!("Get json {:?}", reader.get(&store, "json").unwrap());
//! println!("Get blob {:?}", reader.get(&store, "blob").unwrap());
//!
//! // Retrieving a non-existent value returns `Ok(None)`.
//! println!("Get non-existent value {:?}", reader.get("non-existent"));
//! println!("Get non-existent value {:?}", reader.get(&store, "non-existent"));
//!
//! // A read transaction will automatically close once the reader
//! // goes out of scope, so isn't necessary to close it explicitly,
@ -119,12 +120,12 @@
//!
//! {
//! // Aborting a write transaction rolls back the change(s).
//! let mut writer = store.write(&env).unwrap();
//! writer.put("foo", &Value::Str("bar")).unwrap();
//! let mut writer = env.write::<&str>().unwrap();
//! writer.put(&store, "foo", &Value::Str("bar")).unwrap();
//! writer.abort();
//!
//! let reader = store.read(&env).expect("reader");
//! println!("It should be None! ({:?})", reader.get("foo").unwrap());
//! let reader = env.read().expect("reader");
//! println!("It should be None! ({:?})", reader.get(&store, "foo").unwrap());
//! }
//!
//! {
@ -132,36 +133,36 @@
//! // abort is desired, since both read and write transactions will
//! // implicitly be aborted once they go out of scope.
//! {
//! let mut writer = store.write(&env).unwrap();
//! writer.put("foo", &Value::Str("bar")).unwrap();
//! let mut writer = env.write::<&str>().unwrap();
//! writer.put(&store, "foo", &Value::Str("bar")).unwrap();
//! }
//! let reader = store.read(&env).expect("reader");
//! println!("It should be None! ({:?})", reader.get("foo").unwrap());
//! let reader = env.read::<&str>().expect("reader");
//! println!("It should be None! ({:?})", reader.get(&store, "foo").unwrap());
//! }
//!
//! {
//! // Deleting a key/value pair also requires a write transaction.
//! let mut writer = store.write(&env).unwrap();
//! writer.put("foo", &Value::Str("bar")).unwrap();
//! writer.put("bar", &Value::Str("baz")).unwrap();
//! writer.delete("foo").unwrap();
//! let mut writer = env.write::<&str>().unwrap();
//! writer.put(&store, "foo", &Value::Str("bar")).unwrap();
//! writer.put(&store, "bar", &Value::Str("baz")).unwrap();
//! writer.delete(&store, "foo").unwrap();
//!
//! // A write transaction also supports reading, the version of the
//! // store that it reads includes changes it has made regardless of
//! // the commit state of that transaction.
//! // In the code above, "foo" and "bar" were put into the store,
//! // then "foo" was deleted so only "bar" will return a result.
//! println!("It should be None! ({:?})", writer.get("foo").unwrap());
//! println!("Get bar ({:?})", writer.get("bar").unwrap());
//! println!("It should be None! ({:?})", writer.get(&store, "foo").unwrap());
//! println!("Get bar ({:?})", writer.get(&store, "bar").unwrap());
//! writer.commit().unwrap();
//! let reader = store.read(&env).expect("reader");
//! println!("It should be None! ({:?})", reader.get("foo").unwrap());
//! println!("Get bar {:?}", reader.get("bar").unwrap());
//! let reader = env.read::<&str>().expect("reader");
//! println!("It should be None! ({:?})", reader.get(&store, "foo").unwrap());
//! println!("Get bar {:?}", reader.get(&store, "bar").unwrap());
//!
//! // Committing a transaction consumes the writer, preventing you
//! // from reusing it by failing at compile time with an error.
//! // This line would report error[E0382]: use of moved value: `writer`.
//! // writer.put("baz", &Value::Str("buz")).unwrap();
//! // writer.put(&store, "baz", &Value::Str("buz")).unwrap();
//! }
//! ```
@ -202,10 +203,10 @@ pub use error::{
StoreError,
};
pub use integer::{
IntegerStore,
PrimitiveInt,
};
// pub use integer::{
// IntegerStore,
// PrimitiveInt,
// };
pub use manager::Manager;

@ -28,8 +28,6 @@ use error::StoreError;
use value::Value;
use Rkv;
fn read_transform<'x>(val: Result<&'x [u8], lmdb::Error>) -> Result<Option<Value<'x>>, StoreError> {
match val {
Ok(bytes) => Value::from_tagged_slice(bytes).map(Some).map_err(StoreError::DataError),
@ -43,7 +41,6 @@ where
K: AsRef<[u8]>,
{
tx: RwTransaction<'env>,
db: Database,
phantom: PhantomData<K>,
}
@ -52,7 +49,6 @@ where
K: AsRef<[u8]>,
{
tx: RoTransaction<'env>,
db: Database,
phantom: PhantomData<K>,
}
@ -65,23 +61,30 @@ impl<'env, K> Writer<'env, K>
where
K: AsRef<[u8]>,
{
pub fn get<'s>(&'s self, k: K) -> Result<Option<Value<'s>>, StoreError> {
let bytes = self.tx.get(self.db, &k.as_ref());
pub fn new(txn: RwTransaction) -> Writer<K> {
Writer {
tx: txn,
phantom: PhantomData,
}
}
pub fn get<'s>(&'s self, store: &'s Store, k: K) -> Result<Option<Value<'s>>, StoreError> {
let bytes = self.tx.get(store.db, &k.as_ref());
read_transform(bytes)
}
// TODO: flags
pub fn put<'s>(&'s mut self, k: K, v: &Value) -> Result<(), StoreError> {
pub fn put<'s>(&'s mut self, store: &'s Store, k: K, v: &Value) -> Result<(), StoreError> {
// TODO: don't allocate twice.
let bytes = v.to_bytes()?;
self.tx.put(self.db, &k.as_ref(), &bytes, WriteFlags::empty()).map_err(StoreError::LmdbError)
self.tx.put(store.db, &k.as_ref(), &bytes, WriteFlags::empty()).map_err(StoreError::LmdbError)
}
pub fn delete<'s>(&'s mut self, k: K) -> Result<(), StoreError> {
self.tx.del(self.db, &k.as_ref(), None).map_err(StoreError::LmdbError)
pub fn delete<'s>(&'s mut self, store: &'s Store, k: K) -> Result<(), StoreError> {
self.tx.del(store.db, &k.as_ref(), None).map_err(StoreError::LmdbError)
}
pub fn delete_value<'s>(&'s mut self, _k: K, _v: &Value) -> Result<(), StoreError> {
pub fn delete_value<'s>(&'s mut self, _store: &'s Store, _k: K, _v: &Value) -> Result<(), StoreError> {
// Even better would be to make this a method only on a dupsort store —
// it would need a little bit of reorganizing of types and traits,
// but when I see "If the database does not support sorted duplicate
@ -103,8 +106,15 @@ impl<'env, K> Reader<'env, K>
where
K: AsRef<[u8]>,
{
pub fn get<'s>(&'s self, k: K) -> Result<Option<Value<'s>>, StoreError> {
let bytes = self.tx.get(self.db, &k.as_ref());
pub fn new(txn: RoTransaction) -> Reader<K> {
Reader {
tx: txn,
phantom: PhantomData,
}
}
pub fn get<'s>(&'s self, store: &'s Store, k: K) -> Result<Option<Value<'s>>, StoreError> {
let bytes = self.tx.get(store.db, &k.as_ref());
read_transform(bytes)
}
@ -112,8 +122,8 @@ where
self.tx.abort();
}
pub fn iter_start<'s>(&'s self) -> Result<Iter<'s>, StoreError> {
let mut cursor = self.tx.open_ro_cursor(self.db).map_err(StoreError::LmdbError)?;
pub fn iter_start<'s>(&'s self, store: &'s Store) -> Result<Iter<'s>, StoreError> {
let mut cursor = self.tx.open_ro_cursor(store.db).map_err(StoreError::LmdbError)?;
// We call Cursor.iter() instead of Cursor.iter_start() because
// the latter panics at "called `Result::unwrap()` on an `Err` value:
@ -131,8 +141,8 @@ where
})
}
pub fn iter_from<'s>(&'s self, k: K) -> Result<Iter<'s>, StoreError> {
let mut cursor = self.tx.open_ro_cursor(self.db).map_err(StoreError::LmdbError)?;
pub fn iter_from<'s>(&'s self, store: &'s Store, k: K) -> Result<Iter<'s>, StoreError> {
let mut cursor = self.tx.open_ro_cursor(store.db).map_err(StoreError::LmdbError)?;
let iter = cursor.iter_from(k);
Ok(Iter {
iter: iter,
@ -153,47 +163,14 @@ impl<'env> Iterator for Iter<'env> {
}
/// Wrapper around an `lmdb::Database`.
pub struct Store<K>
where
K: AsRef<[u8]>,
{
pub struct Store {
db: Database,
phantom: PhantomData<K>,
}
impl<K> Store<K>
where
K: AsRef<[u8]>,
{
pub fn new(db: Database) -> Store<K> {
impl Store {
pub fn new(db: Database) -> Store {
Store {
db: db,
phantom: PhantomData,
db,
}
}
pub fn read<'env>(&self, env: &'env Rkv) -> Result<Reader<'env, K>, StoreError> {
let tx = env.read()?;
Ok(Reader {
tx: tx,
db: self.db,
phantom: PhantomData,
})
}
/// Note: there may be only one write transaction active at any given time,
/// so this will block if any other writers currently exist for this store.
pub fn write<'env>(&self, env: &'env Rkv) -> Result<Writer<'env, K>, lmdb::Error> {
let tx = env.write()?;
Ok(Writer {
tx: tx,
db: self.db,
phantom: PhantomData,
})
}
pub fn get<'env, 'tx>(&self, tx: &'tx RoTransaction<'env>, k: K) -> Result<Option<Value<'tx>>, StoreError> {
let bytes = tx.get(self.db, &k.as_ref());
read_transform(bytes)
}
}

Loading…
Cancel
Save