Generalize LMDB usage and add support for multiple backing stores

Signed-off-by: Victor Porof <victor.porof@gmail.com>
without.crypto
Victor Porof 5 years ago
parent 8b51df1952
commit a4d76eb250
  1. 18
      examples/iterator.rs
  2. 21
      examples/simple-store.rs
  3. 24
      src/backend.rs
  4. 41
      src/backend/common.rs
  5. 42
      src/backend/impl_lmdb.rs
  6. 64
      src/backend/impl_lmdb/cursor.rs
  7. 16
      src/backend/impl_lmdb/database.rs
  8. 115
      src/backend/impl_lmdb/environment.rs
  9. 34
      src/backend/impl_lmdb/error.rs
  10. 127
      src/backend/impl_lmdb/flags.rs
  11. 35
      src/backend/impl_lmdb/info.rs
  12. 23
      src/backend/impl_lmdb/iter.rs
  13. 39
      src/backend/impl_lmdb/stat.rs
  14. 90
      src/backend/impl_lmdb/transaction.rs
  15. 177
      src/backend/traits.rs
  16. 15
      src/bin/dump.rs
  17. 29
      src/bin/rand.rs
  18. 276
      src/env.rs
  19. 59
      src/error.rs
  20. 44
      src/helpers.rs
  21. 74
      src/lib.rs
  22. 106
      src/manager.rs
  23. 115
      src/readwrite.rs
  24. 27
      src/store.rs
  25. 54
      src/store/integer.rs
  26. 84
      src/store/integermulti.rs
  27. 157
      src/store/multi.rs
  28. 100
      src/store/single.rs
  29. 10
      src/value.rs
  30. 12
      tests/integer-store.rs
  31. 13
      tests/manager.rs
  32. 12
      tests/multi-integer-store.rs
  33. 62
      tests/test_txn.rs

@ -7,6 +7,16 @@
//!
//! cargo run --example iterator
use std::fs;
use std::str;
use tempfile::Builder;
use rkv::backend::{
Lmdb,
LmdbDatabase,
LmdbEnvironment,
};
use rkv::{
Manager,
Rkv,
@ -15,17 +25,13 @@ use rkv::{
StoreOptions,
Value,
};
use tempfile::Builder;
use std::fs;
use std::str;
fn main() {
let root = Builder::new().prefix("iterator").tempdir().unwrap();
fs::create_dir_all(root.path()).unwrap();
let p = root.path();
let created_arc = Manager::singleton().write().unwrap().get_or_create(p, Rkv::new).unwrap();
let created_arc = Manager::singleton().write().unwrap().get_or_create(p, Rkv::new::<Lmdb>).unwrap();
let k = created_arc.read().unwrap();
let store = k.open_single("store", StoreOptions::create()).unwrap();
@ -58,7 +64,7 @@ fn main() {
}
}
fn populate_store(k: &Rkv, store: SingleStore) -> Result<(), StoreError> {
fn populate_store(k: &Rkv<LmdbEnvironment>, store: SingleStore<LmdbDatabase>) -> Result<(), StoreError> {
let mut writer = k.write()?;
for (country, city) in vec![
("Canada", Value::Str("Ottawa")),

@ -7,17 +7,25 @@
//!
//! cargo run --example simple-store
use std::fs;
use tempfile::Builder;
use rkv::backend::{
BackendStat,
Lmdb,
LmdbDatabase,
LmdbRwTransaction,
};
use rkv::{
Manager,
MultiStore,
Rkv,
StoreOptions,
Value,
Writer,
};
use tempfile::Builder;
use std::fs;
type MultiStore = rkv::MultiStore<LmdbDatabase>;
type Writer<'env> = rkv::Writer<LmdbRwTransaction<'env>>;
fn getput<'env, 's>(store: MultiStore, writer: &'env mut Writer, ids: &'s mut Vec<String>) {
let keys = vec!["str1", "str2", "str3"];
@ -53,12 +61,11 @@ fn main() {
let p = root.path();
// The manager enforces that each process opens the same lmdb environment at most once
let created_arc = Manager::singleton().write().unwrap().get_or_create(p, Rkv::new).unwrap();
let created_arc = Manager::singleton().write().unwrap().get_or_create(p, Rkv::new::<Lmdb>).unwrap();
let k = created_arc.read().unwrap();
// Creates a store called "store"
let store = k.open_single("store", StoreOptions::create()).unwrap();
let multistore = k.open_multi("multistore", StoreOptions::create()).unwrap();
println!("Inserting data...");
@ -95,6 +102,7 @@ fn main() {
delete(multistore, &mut writer);
writer.commit().unwrap();
}
println!("Looking up keys...");
{
// Use a reader to query the store
@ -179,5 +187,6 @@ fn main() {
println!("Get from store value: {:?}", store.get(&reader, "foo").unwrap());
println!("Get from another store value: {:?}", another_store.get(&reader, "foo").unwrap());
}
println!("Environment statistics: btree depth = {}", k.stat().unwrap().depth());
}

@ -0,0 +1,24 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
mod common;
mod impl_lmdb;
mod traits;
pub use common::*;
pub use traits::*;
pub use impl_lmdb::DatabaseImpl as LmdbDatabase;
pub use impl_lmdb::EnvironmentBuilderImpl as Lmdb;
pub use impl_lmdb::EnvironmentImpl as LmdbEnvironment;
pub use impl_lmdb::ErrorImpl as LmdbError;
pub use impl_lmdb::RoCursorImpl as LmdbRoCursor;
pub use impl_lmdb::RoTransactionImpl as LmdbRoTransaction;
pub use impl_lmdb::RwTransactionImpl as LmdbRwTransaction;

@ -0,0 +1,41 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
#![allow(non_camel_case_types)]
pub enum EnvironmentFlags {
FIXED_MAP,
NO_SUB_DIR,
WRITE_MAP,
READ_ONLY,
NO_META_SYNC,
NO_SYNC,
MAP_ASYNC,
NO_TLS,
NO_LOCK,
NO_READAHEAD,
NO_MEM_INIT,
}
pub enum DatabaseFlags {
REVERSE_KEY,
DUP_SORT,
INTEGER_KEY,
DUP_FIXED,
INTEGER_DUP,
REVERSE_DUP,
}
pub enum WriteFlags {
NO_OVERWRITE,
NO_DUP_DATA,
CURRENT,
APPEND,
APPEND_DUP,
}

@ -0,0 +1,42 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
mod cursor;
mod database;
mod environment;
mod error;
mod flags;
mod info;
mod iter;
mod stat;
mod transaction;
pub use cursor::{
RoCursorImpl,
RwCursorImpl,
};
pub use database::DatabaseImpl;
pub use environment::{
EnvironmentBuilderImpl,
EnvironmentImpl,
};
pub use error::ErrorImpl;
pub use flags::{
DatabaseFlagsImpl,
EnvironmentFlagsImpl,
WriteFlagsImpl,
};
pub use info::InfoImpl;
pub use iter::IterImpl;
pub use stat::StatImpl;
pub use transaction::{
RoTransactionImpl,
RwTransactionImpl,
};

@ -0,0 +1,64 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use lmdb::Cursor;
use super::IterImpl;
use crate::backend::traits::BackendRoCursor;
#[derive(Debug)]
pub struct RoCursorImpl<'env>(pub(crate) lmdb::RoCursor<'env>);
impl<'env> BackendRoCursor<'env> for RoCursorImpl<'env> {
type Iter = IterImpl<'env>;
fn iter(&mut self) -> Self::Iter {
IterImpl(self.0.iter())
}
fn iter_from<K>(&mut self, key: K) -> Self::Iter
where
K: AsRef<[u8]>,
{
IterImpl(self.0.iter_from(key))
}
fn iter_dup_of<K>(&mut self, key: K) -> Self::Iter
where
K: AsRef<[u8]>,
{
IterImpl(self.0.iter_dup_of(key))
}
}
#[derive(Debug)]
pub struct RwCursorImpl<'env>(pub(crate) lmdb::RwCursor<'env>);
impl<'env> BackendRoCursor<'env> for RwCursorImpl<'env> {
type Iter = IterImpl<'env>;
fn iter(&mut self) -> Self::Iter {
IterImpl(self.0.iter())
}
fn iter_from<K>(&mut self, key: K) -> Self::Iter
where
K: AsRef<[u8]>,
{
IterImpl(self.0.iter_from(key))
}
fn iter_dup_of<K>(&mut self, key: K) -> Self::Iter
where
K: AsRef<[u8]>,
{
IterImpl(self.0.iter_dup_of(key))
}
}

@ -0,0 +1,16 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use crate::backend::traits::BackendDatabase;
#[derive(Debug, Eq, PartialEq, Copy, Clone)]
pub struct DatabaseImpl(pub(crate) lmdb::Database);
impl BackendDatabase for DatabaseImpl {}

@ -0,0 +1,115 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::path::Path;
use super::{
DatabaseFlagsImpl,
DatabaseImpl,
EnvironmentFlagsImpl,
ErrorImpl,
InfoImpl,
RoTransactionImpl,
RwTransactionImpl,
StatImpl,
};
use crate::backend::traits::{
BackendEnvironment,
BackendEnvironmentBuilder,
};
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub struct EnvironmentBuilderImpl(lmdb::EnvironmentBuilder);
impl<'env> BackendEnvironmentBuilder<'env> for EnvironmentBuilderImpl {
type Error = ErrorImpl;
type Environment = EnvironmentImpl;
type Flags = EnvironmentFlagsImpl;
fn new() -> EnvironmentBuilderImpl {
EnvironmentBuilderImpl(lmdb::Environment::new())
}
fn set_flags<T>(&mut self, flags: T) -> &mut Self
where
T: Into<Self::Flags>,
{
self.0.set_flags(flags.into().0);
self
}
fn set_max_readers(&mut self, max_readers: u32) -> &mut Self {
self.0.set_max_readers(max_readers);
self
}
fn set_max_dbs(&mut self, max_dbs: u32) -> &mut Self {
self.0.set_max_dbs(max_dbs);
self
}
fn set_map_size(&mut self, size: usize) -> &mut Self {
self.0.set_map_size(size);
self
}
fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error> {
self.0.open(path).map(EnvironmentImpl).map_err(ErrorImpl)
}
}
#[derive(Debug)]
pub struct EnvironmentImpl(lmdb::Environment);
impl<'env> BackendEnvironment<'env> for EnvironmentImpl {
type Error = ErrorImpl;
type Database = DatabaseImpl;
type Flags = DatabaseFlagsImpl;
type Stat = StatImpl;
type Info = InfoImpl;
type RoTransaction = RoTransactionImpl<'env>;
type RwTransaction = RwTransactionImpl<'env>;
fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> {
self.0.open_db(name).map(DatabaseImpl).map_err(ErrorImpl)
}
fn create_db(&self, name: Option<&str>, flags: Self::Flags) -> Result<Self::Database, Self::Error> {
self.0.create_db(name, flags.0).map(DatabaseImpl).map_err(ErrorImpl)
}
fn begin_ro_txn(&'env self) -> Result<Self::RoTransaction, Self::Error> {
self.0.begin_ro_txn().map(RoTransactionImpl).map_err(ErrorImpl)
}
fn begin_rw_txn(&'env self) -> Result<Self::RwTransaction, Self::Error> {
self.0.begin_rw_txn().map(RwTransactionImpl).map_err(ErrorImpl)
}
fn sync(&self, force: bool) -> Result<(), Self::Error> {
self.0.sync(force).map_err(ErrorImpl)
}
fn stat(&self) -> Result<Self::Stat, Self::Error> {
self.0.stat().map(StatImpl).map_err(ErrorImpl)
}
fn info(&self) -> Result<Self::Info, Self::Error> {
self.0.info().map(InfoImpl).map_err(ErrorImpl)
}
fn freelist(&self) -> Result<usize, Self::Error> {
self.0.freelist().map_err(ErrorImpl)
}
fn set_map_size(&self, size: usize) -> Result<(), Self::Error> {
self.0.set_map_size(size).map_err(ErrorImpl)
}
}

@ -0,0 +1,34 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::fmt;
use crate::backend::traits::BackendError;
use crate::error::StoreError;
#[derive(Debug)]
pub struct ErrorImpl(pub(crate) lmdb::Error);
impl BackendError for ErrorImpl {}
impl fmt::Display for ErrorImpl {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
self.0.fmt(fmt)
}
}
impl Into<StoreError> for ErrorImpl {
fn into(self) -> StoreError {
match self.0 {
lmdb::Error::NotFound => StoreError::KeyValuePairNotFound,
_ => StoreError::LmdbError(self.0),
}
}
}

@ -0,0 +1,127 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use crate::backend::common::{
DatabaseFlags,
EnvironmentFlags,
WriteFlags,
};
use crate::backend::traits::{
BackendDatabaseFlags,
BackendEnvironmentFlags,
BackendFlags,
BackendWriteFlags,
};
#[derive(Debug, Eq, PartialEq, Copy, Clone, Default)]
pub struct EnvironmentFlagsImpl(pub(crate) lmdb::EnvironmentFlags);
impl BackendFlags for EnvironmentFlagsImpl {
fn empty() -> EnvironmentFlagsImpl {
EnvironmentFlagsImpl(lmdb::EnvironmentFlags::empty())
}
}
impl BackendEnvironmentFlags for EnvironmentFlagsImpl {
fn set(&mut self, flag: EnvironmentFlags, value: bool) {
self.0.set(flag.into(), value)
}
}
impl Into<EnvironmentFlagsImpl> for EnvironmentFlags {
fn into(self) -> EnvironmentFlagsImpl {
EnvironmentFlagsImpl(self.into())
}
}
impl Into<lmdb::EnvironmentFlags> for EnvironmentFlags {
fn into(self) -> lmdb::EnvironmentFlags {
match self {
EnvironmentFlags::FIXED_MAP => lmdb::EnvironmentFlags::FIXED_MAP,
EnvironmentFlags::NO_SUB_DIR => lmdb::EnvironmentFlags::NO_SUB_DIR,
EnvironmentFlags::WRITE_MAP => lmdb::EnvironmentFlags::WRITE_MAP,
EnvironmentFlags::READ_ONLY => lmdb::EnvironmentFlags::READ_ONLY,
EnvironmentFlags::NO_META_SYNC => lmdb::EnvironmentFlags::NO_META_SYNC,
EnvironmentFlags::NO_SYNC => lmdb::EnvironmentFlags::NO_SYNC,
EnvironmentFlags::MAP_ASYNC => lmdb::EnvironmentFlags::MAP_ASYNC,
EnvironmentFlags::NO_TLS => lmdb::EnvironmentFlags::NO_TLS,
EnvironmentFlags::NO_LOCK => lmdb::EnvironmentFlags::NO_LOCK,
EnvironmentFlags::NO_READAHEAD => lmdb::EnvironmentFlags::NO_READAHEAD,
EnvironmentFlags::NO_MEM_INIT => lmdb::EnvironmentFlags::NO_MEM_INIT,
}
}
}
#[derive(Debug, Eq, PartialEq, Copy, Clone, Default)]
pub struct DatabaseFlagsImpl(pub(crate) lmdb::DatabaseFlags);
impl BackendFlags for DatabaseFlagsImpl {
fn empty() -> DatabaseFlagsImpl {
DatabaseFlagsImpl(lmdb::DatabaseFlags::empty())
}
}
impl BackendDatabaseFlags for DatabaseFlagsImpl {
fn set(&mut self, flag: DatabaseFlags, value: bool) {
self.0.set(flag.into(), value)
}
}
impl Into<DatabaseFlagsImpl> for DatabaseFlags {
fn into(self) -> DatabaseFlagsImpl {
DatabaseFlagsImpl(self.into())
}
}
impl Into<lmdb::DatabaseFlags> for DatabaseFlags {
fn into(self) -> lmdb::DatabaseFlags {
match self {
DatabaseFlags::REVERSE_KEY => lmdb::DatabaseFlags::REVERSE_KEY,
DatabaseFlags::DUP_SORT => lmdb::DatabaseFlags::DUP_SORT,
DatabaseFlags::INTEGER_KEY => lmdb::DatabaseFlags::INTEGER_KEY,
DatabaseFlags::DUP_FIXED => lmdb::DatabaseFlags::DUP_FIXED,
DatabaseFlags::INTEGER_DUP => lmdb::DatabaseFlags::INTEGER_DUP,
DatabaseFlags::REVERSE_DUP => lmdb::DatabaseFlags::REVERSE_DUP,
}
}
}
#[derive(Debug, Eq, PartialEq, Copy, Clone, Default)]
pub struct WriteFlagsImpl(pub(crate) lmdb::WriteFlags);
impl BackendFlags for WriteFlagsImpl {
fn empty() -> WriteFlagsImpl {
WriteFlagsImpl(lmdb::WriteFlags::empty())
}
}
impl BackendWriteFlags for WriteFlagsImpl {
fn set(&mut self, flag: WriteFlags, value: bool) {
self.0.set(flag.into(), value)
}
}
impl Into<WriteFlagsImpl> for WriteFlags {
fn into(self) -> WriteFlagsImpl {
WriteFlagsImpl(self.into())
}
}
impl Into<lmdb::WriteFlags> for WriteFlags {
fn into(self) -> lmdb::WriteFlags {
match self {
WriteFlags::NO_OVERWRITE => lmdb::WriteFlags::NO_OVERWRITE,
WriteFlags::NO_DUP_DATA => lmdb::WriteFlags::NO_DUP_DATA,
WriteFlags::CURRENT => lmdb::WriteFlags::CURRENT,
WriteFlags::APPEND => lmdb::WriteFlags::APPEND,
WriteFlags::APPEND_DUP => lmdb::WriteFlags::APPEND_DUP,
}
}
}

@ -0,0 +1,35 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use crate::backend::traits::BackendInfo;
pub struct InfoImpl(pub(crate) lmdb::Info);
impl BackendInfo for InfoImpl {
fn map_size(&self) -> usize {
self.0.map_size()
}
fn last_pgno(&self) -> usize {
self.0.last_pgno()
}
fn last_txnid(&self) -> usize {
self.0.last_txnid()
}
fn max_readers(&self) -> usize {
self.0.max_readers() as usize
}
fn num_readers(&self) -> usize {
self.0.num_readers() as usize
}
}

@ -0,0 +1,23 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use super::ErrorImpl;
use crate::backend::traits::BackendIter;
pub struct IterImpl<'env>(pub(crate) lmdb::Iter<'env>);
impl<'env> BackendIter<'env> for IterImpl<'env> {
type Error = ErrorImpl;
#[allow(clippy::type_complexity)]
fn next(&mut self) -> Option<Result<(&'env [u8], &'env [u8]), Self::Error>> {
self.0.next().map(|e| e.map_err(ErrorImpl))
}
}

@ -0,0 +1,39 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use crate::backend::traits::BackendStat;
pub struct StatImpl(pub(crate) lmdb::Stat);
impl BackendStat for StatImpl {
fn page_size(&self) -> usize {
self.0.page_size() as usize
}
fn depth(&self) -> usize {
self.0.depth() as usize
}
fn branch_pages(&self) -> usize {
self.0.branch_pages()
}
fn leaf_pages(&self) -> usize {
self.0.leaf_pages()
}
fn overflow_pages(&self) -> usize {
self.0.overflow_pages()
}
fn entries(&self) -> usize {
self.0.entries()
}
}

@ -0,0 +1,90 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use lmdb::Transaction;
use super::{
DatabaseImpl,
ErrorImpl,
RoCursorImpl,
WriteFlagsImpl,
};
use crate::backend::traits::{
BackendRoCursorTransaction,
BackendRoTransaction,
BackendRwCursorTransaction,
BackendRwTransaction,
};
#[derive(Debug)]
pub struct RoTransactionImpl<'env>(pub(crate) lmdb::RoTransaction<'env>);
impl<'env> BackendRoTransaction for RoTransactionImpl<'env> {
type Error = ErrorImpl;
type Database = DatabaseImpl;
type Flags = WriteFlagsImpl;
fn get(&self, db: Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> {
self.0.get(db.0, &key).map_err(ErrorImpl)
}
fn abort(self) {
self.0.abort()
}
}
impl<'env> BackendRoCursorTransaction<'env> for RoTransactionImpl<'env> {
type RoCursor = RoCursorImpl<'env>;
fn open_ro_cursor(&'env self, db: Self::Database) -> Result<Self::RoCursor, Self::Error> {
self.0.open_ro_cursor(db.0).map(RoCursorImpl).map_err(ErrorImpl)
}
}
#[derive(Debug)]
pub struct RwTransactionImpl<'env>(pub(crate) lmdb::RwTransaction<'env>);
impl<'env> BackendRwTransaction for RwTransactionImpl<'env> {
type Error = ErrorImpl;
type Database = DatabaseImpl;
type Flags = WriteFlagsImpl;
fn get(&self, db: Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> {
self.0.get(db.0, &key).map_err(ErrorImpl)
}
fn put(&mut self, db: Self::Database, key: &[u8], value: &[u8], flags: Self::Flags) -> Result<(), Self::Error> {
self.0.put(db.0, &key, &value, flags.0).map_err(ErrorImpl)
}
fn del(&mut self, db: Self::Database, key: &[u8], value: Option<&[u8]>) -> Result<(), Self::Error> {
self.0.del(db.0, &key, value).map_err(ErrorImpl)
}
fn clear_db(&mut self, db: Self::Database) -> Result<(), Self::Error> {
self.0.clear_db(db.0).map_err(ErrorImpl)
}
fn commit(self) -> Result<(), Self::Error> {
self.0.commit().map_err(ErrorImpl)
}
fn abort(self) {
self.0.abort()
}
}
impl<'env> BackendRwCursorTransaction<'env> for RwTransactionImpl<'env> {
type RoCursor = RoCursorImpl<'env>;
fn open_ro_cursor(&'env self, db: Self::Database) -> Result<Self::RoCursor, Self::Error> {
self.0.open_ro_cursor(db.0).map(RoCursorImpl).map_err(ErrorImpl)
}
}

@ -0,0 +1,177 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::fmt::{
Debug,
Display,
};
use std::path::Path;
use crate::backend::common::{
DatabaseFlags,
EnvironmentFlags,
WriteFlags,
};
use crate::error::StoreError;
pub trait BackendError: Debug + Display + Into<StoreError> {}
pub trait BackendDatabase: Debug + Eq + PartialEq + Copy + Clone {}
pub trait BackendFlags: Debug + Eq + PartialEq + Copy + Clone + Default {
fn empty() -> Self;
}
pub trait BackendEnvironmentFlags: BackendFlags {
fn set(&mut self, flag: EnvironmentFlags, value: bool);
}
pub trait BackendDatabaseFlags: BackendFlags {
fn set(&mut self, flag: DatabaseFlags, value: bool);
}
pub trait BackendWriteFlags: BackendFlags {
fn set(&mut self, flag: WriteFlags, value: bool);
}
pub trait BackendStat {
fn page_size(&self) -> usize;
fn depth(&self) -> usize;
fn branch_pages(&self) -> usize;
fn leaf_pages(&self) -> usize;
fn overflow_pages(&self) -> usize;
fn entries(&self) -> usize;
}
pub trait BackendInfo {
fn map_size(&self) -> usize;
fn last_pgno(&self) -> usize;
fn last_txnid(&self) -> usize;
fn max_readers(&self) -> usize;
fn num_readers(&self) -> usize;
}
pub trait BackendEnvironmentBuilder<'env>: Debug + Eq + PartialEq + Copy + Clone {
type Error: BackendError;
type Environment: BackendEnvironment<'env>;
type Flags: BackendEnvironmentFlags;
fn new() -> Self;
fn set_flags<T>(&mut self, flags: T) -> &mut Self
where
T: Into<Self::Flags>;
fn set_max_dbs(&mut self, max_dbs: u32) -> &mut Self;
fn set_max_readers(&mut self, max_readers: u32) -> &mut Self;
fn set_map_size(&mut self, size: usize) -> &mut Self;
fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error>;
}
pub trait BackendEnvironment<'env>: Debug {
type Error: BackendError;
type Database: BackendDatabase;
type Flags: BackendDatabaseFlags;
type Stat: BackendStat;
type Info: BackendInfo;
type RoTransaction: BackendRoCursorTransaction<'env, Database = Self::Database>;
type RwTransaction: BackendRwCursorTransaction<'env, Database = Self::Database>;
fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error>;
fn create_db(&self, name: Option<&str>, flags: Self::Flags) -> Result<Self::Database, Self::Error>;
fn begin_ro_txn(&'env self) -> Result<Self::RoTransaction, Self::Error>;
fn begin_rw_txn(&'env self) -> Result<Self::RwTransaction, Self::Error>;
fn sync(&self, force: bool) -> Result<(), Self::Error>;
fn stat(&self) -> Result<Self::Stat, Self::Error>;
fn info(&self) -> Result<Self::Info, Self::Error>;
fn freelist(&self) -> Result<usize, Self::Error>;
fn set_map_size(&self, size: usize) -> Result<(), Self::Error>;
}
pub trait BackendRoTransaction: Debug {
type Error: BackendError;
type Database: BackendDatabase;
type Flags: BackendWriteFlags;
fn get(&self, db: Self::Database, key: &[u8]) -> Result<&[u8], Self::Error>;
fn abort(self);
}
pub trait BackendRwTransaction: Debug {
type Error: BackendError;
type Database: BackendDatabase;
type Flags: BackendWriteFlags;
fn get(&self, db: Self::Database, key: &[u8]) -> Result<&[u8], Self::Error>;
fn put(&mut self, db: Self::Database, key: &[u8], value: &[u8], flags: Self::Flags) -> Result<(), Self::Error>;
fn del(&mut self, db: Self::Database, key: &[u8], value: Option<&[u8]>) -> Result<(), Self::Error>;
fn clear_db(&mut self, db: Self::Database) -> Result<(), Self::Error>;
fn commit(self) -> Result<(), Self::Error>;
fn abort(self);
}
pub trait BackendRoCursorTransaction<'env>: BackendRoTransaction {
type RoCursor: BackendRoCursor<'env>;
fn open_ro_cursor(&'env self, db: Self::Database) -> Result<Self::RoCursor, Self::Error>;
}
pub trait BackendRwCursorTransaction<'env>: BackendRwTransaction {
type RoCursor: BackendRoCursor<'env>;
fn open_ro_cursor(&'env self, db: Self::Database) -> Result<Self::RoCursor, Self::Error>;
}
pub trait BackendRoCursor<'env>: Debug {
type Iter: BackendIter<'env>;
fn iter(&mut self) -> Self::Iter;
fn iter_from<K>(&mut self, key: K) -> Self::Iter
where
K: AsRef<[u8]>;
fn iter_dup_of<K>(&mut self, key: K) -> Self::Iter
where
K: AsRef<[u8]>;
}
pub trait BackendIter<'env> {
type Error: BackendError;
#[allow(clippy::type_complexity)]
fn next(&mut self) -> Option<Result<(&'env [u8], &'env [u8]), Self::Error>>;
}

@ -8,17 +8,12 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
extern crate rkv;
use std::env::args;
use std::io;
use std::path::Path;
use rkv::{
error::MigrateError,
migrate::Migrator,
};
use std::{
env::args,
io,
path::Path,
};
use rkv::migrate::Migrator;
use rkv::MigrateError;
fn main() -> Result<(), MigrateError> {
let mut cli_args = args();

@ -14,23 +14,21 @@
//! the number of key/value pairs to create via the `-n <number>` flag
//! (for which the default value is 50).
extern crate rkv;
use std::env::args;
use std::fs;
use std::fs::File;
use std::io::Read;
use std::path::Path;
use rkv::backend::{
BackendEnvironmentBuilder,
Lmdb,
};
use rkv::{
Rkv,
SingleStore,
StoreOptions,
Value,
};
use std::{
env::args,
fs::{
create_dir_all,
File,
},
io::Read,
path::Path,
};
fn main() {
let mut args = args();
@ -69,19 +67,18 @@ fn main() {
if path.is_none() {
panic!("must provide a path to the LMDB environment");
}
let path = path.unwrap();
create_dir_all(&path).expect("dir created");
let path = path.unwrap();
fs::create_dir_all(&path).expect("dir created");
let mut builder = Rkv::environment_builder();
let mut builder = Rkv::environment_builder::<Lmdb>();
builder.set_max_dbs(2);
// Allocate enough map to accommodate the largest random collection.
// We currently do this by allocating twice the maximum possible size
// of the pairs (assuming maximum key and value sizes).
builder.set_map_size((511 + 65535) * num_pairs * 2);
let rkv = Rkv::from_env(Path::new(&path), builder).expect("Rkv");
let store: SingleStore =
rkv.open_single(database.as_ref().map(|x| x.as_str()), StoreOptions::create()).expect("opened");
let store = rkv.open_single(database.as_ref().map(|x| x.as_str()), StoreOptions::create()).expect("opened");
let mut writer = rkv.write().expect("writer");
// Generate random values for the number of keys and key/value lengths.

@ -9,80 +9,91 @@
// specific language governing permissions and limitations under the License.
use std::os::raw::c_uint;
use std::path::{
Path,
PathBuf,
};
use lmdb;
use lmdb::{
Database,
use crate::backend::{
BackendDatabaseFlags,
BackendEnvironment,
BackendEnvironmentBuilder,
BackendInfo,
BackendRoCursorTransaction,
BackendRwCursorTransaction,
BackendStat,
DatabaseFlags,
Environment,
EnvironmentBuilder,
Error,
Info,
Stat,
};
use crate::error::StoreError;
use crate::readwrite::{
Reader,
Writer,
};
use crate::store::integer::IntegerStore;
use crate::store::keys::PrimitiveInt;
use crate::store::integermulti::MultiIntegerStore;
use crate::store::keys::PrimitiveInt;
use crate::store::multi::MultiStore;
use crate::store::single::SingleStore;
use crate::store::Options as StoreOptions;
pub static DEFAULT_MAX_DBS: c_uint = 5;
/// Wrapper around an `lmdb::Environment`.
/// Wrapper around an `Environment` (e.g. an LMDB environment).
#[derive(Debug)]
pub struct Rkv {
pub struct Rkv<E> {
path: PathBuf,
env: Environment,
env: E,
}
/// Static methods.
impl Rkv {
pub fn environment_builder() -> EnvironmentBuilder {
Environment::new()
impl<'env, E> Rkv<E>
where
E: BackendEnvironment<'env>,
{
pub fn environment_builder<B>() -> B
where
B: BackendEnvironmentBuilder<'env, Environment = E>,
{
B::new()
}
/// Return a new Rkv environment that supports up to `DEFAULT_MAX_DBS` open databases.
#[allow(clippy::new_ret_no_self)]
pub fn new(path: &Path) -> Result<Rkv, StoreError> {
Rkv::with_capacity(path, DEFAULT_MAX_DBS)
pub fn new<B>(path: &Path) -> Result<Rkv<E>, StoreError>
where
B: BackendEnvironmentBuilder<'env, Environment = E>,
{
Rkv::with_capacity::<B>(path, DEFAULT_MAX_DBS)
}
/// Return a new Rkv environment from the provided builder.
pub fn from_env(path: &Path, env: EnvironmentBuilder) -> Result<Rkv, StoreError> {
pub fn from_env<B>(path: &Path, builder: B) -> Result<Rkv<E>, StoreError>
where
B: BackendEnvironmentBuilder<'env, Environment = E>,
{
if !path.is_dir() {
return Err(StoreError::DirectoryDoesNotExistError(path.into()));
}
Ok(Rkv {
path: path.into(),
env: env.open(path).map_err(|e| match e {
lmdb::Error::Other(2) => StoreError::DirectoryDoesNotExistError(path.into()),
e => StoreError::LmdbError(e),
env: builder.open(path).map_err(|e| match e.into() {
StoreError::OtherError(2) => StoreError::DirectoryDoesNotExistError(path.into()),
e => e,
})?,
})
}
/// Return a new Rkv environment that supports the specified number of open databases.
pub fn with_capacity(path: &Path, max_dbs: c_uint) -> Result<Rkv, StoreError> {
pub fn with_capacity<B>(path: &Path, max_dbs: c_uint) -> Result<Rkv<E>, StoreError>
where
B: BackendEnvironmentBuilder<'env, Environment = E>,
{
if !path.is_dir() {
return Err(StoreError::DirectoryDoesNotExistError(path.into()));
}
let mut builder = Rkv::environment_builder();
let mut builder = B::new();
builder.set_max_dbs(max_dbs);
// Future: set flags, maximum size, etc. here if necessary.
@ -91,11 +102,18 @@ impl Rkv {
}
/// Store creation methods.
impl Rkv {
impl<'env, E> Rkv<E>
where
E: BackendEnvironment<'env>,
{
/// Create or Open an existing database in (&[u8] -> Single Value) mode.
/// Note: that create=true cannot be called concurrently with other operations
/// so if you are sure that the database exists, call this with create=false.
pub fn open_single<'s, T>(&self, name: T, opts: StoreOptions) -> Result<SingleStore, StoreError>
pub fn open_single<'s, T>(
&self,
name: T,
opts: StoreOptions<E::Flags>,
) -> Result<SingleStore<E::Database>, StoreError>
where
T: Into<Option<&'s str>>,
{
@ -105,12 +123,13 @@ impl Rkv {
/// Create or Open an existing database in (Integer -> Single Value) mode.
/// Note: that create=true cannot be called concurrently with other operations
/// so if you are sure that the database exists, call this with create=false.
pub fn open_integer<'s, T, K: PrimitiveInt>(
pub fn open_integer<'s, T, K>(
&self,
name: T,
mut opts: StoreOptions,
) -> Result<IntegerStore<K>, StoreError>
mut opts: StoreOptions<E::Flags>,
) -> Result<IntegerStore<E::Database, K>, StoreError>
where
K: PrimitiveInt,
T: Into<Option<&'s str>>,
{
opts.flags.set(DatabaseFlags::INTEGER_KEY, true);
@ -120,7 +139,11 @@ impl Rkv {
/// Create or Open an existing database in (&[u8] -> Multiple Values) mode.
/// Note: that create=true cannot be called concurrently with other operations
/// so if you are sure that the database exists, call this with create=false.
pub fn open_multi<'s, T>(&self, name: T, mut opts: StoreOptions) -> Result<MultiStore, StoreError>
pub fn open_multi<'s, T>(
&self,
name: T,
mut opts: StoreOptions<E::Flags>,
) -> Result<MultiStore<E::Database>, StoreError>
where
T: Into<Option<&'s str>>,
{
@ -131,12 +154,13 @@ impl Rkv {
/// Create or Open an existing database in (Integer -> Multiple Values) mode.
/// Note: that create=true cannot be called concurrently with other operations
/// so if you are sure that the database exists, call this with create=false.
pub fn open_multi_integer<'s, T, K: PrimitiveInt>(
pub fn open_multi_integer<'s, T, K>(
&self,
name: T,
mut opts: StoreOptions,
) -> Result<MultiIntegerStore<K>, StoreError>
mut opts: StoreOptions<E::Flags>,
) -> Result<MultiIntegerStore<E::Database, K>, StoreError>
where
K: PrimitiveInt,
T: Into<Option<&'s str>>,
{
opts.flags.set(DatabaseFlags::INTEGER_KEY, true);
@ -144,43 +168,57 @@ impl Rkv {
self.open(name, opts).map(MultiIntegerStore::new)
}
fn open<'s, T>(&self, name: T, opts: StoreOptions) -> Result<Database, StoreError>
fn open<'s, T>(&self, name: T, opts: StoreOptions<E::Flags>) -> Result<E::Database, StoreError>
where
T: Into<Option<&'s str>>,
{
if opts.create {
self.env.create_db(name.into(), opts.flags).map_err(|e| match e {
lmdb::Error::BadRslot => StoreError::open_during_transaction(),
_ => e.into(),
self.env.create_db(name.into(), opts.flags).map_err(|e| match e.into() {
StoreError::LmdbError(lmdb::Error::BadRslot) => StoreError::open_during_transaction(),
e => e,
})
} else {
self.env.open_db(name.into()).map_err(|e| match e {
lmdb::Error::BadRslot => StoreError::open_during_transaction(),
_ => e.into(),
self.env.open_db(name.into()).map_err(|e| match e.into() {
StoreError::LmdbError(lmdb::Error::BadRslot) => StoreError::open_during_transaction(),
e => e,
})
}
}
}
/// Read and write accessors.
impl Rkv {
impl<'env, E> Rkv<E>
where
E: BackendEnvironment<'env>,
{
/// Create a read transaction. There can be multiple concurrent readers
/// for an environment, up to the maximum specified by LMDB (default 126),
/// and you can open readers while a write transaction is active.
pub fn read(&self) -> Result<Reader, StoreError> {
Ok(Reader::new(self.env.begin_ro_txn().map_err(StoreError::from)?))
pub fn read<T>(&'env self) -> Result<Reader<T>, StoreError>
where
E: BackendEnvironment<'env, RoTransaction = T>,
T: BackendRoCursorTransaction<'env, Database = E::Database>,
{
Ok(Reader::new(self.env.begin_ro_txn().map_err(|e| e.into())?))
}
/// Create a write transaction. There can be only one write transaction
/// active at any given time, so trying to create a second one will block
/// until the first is committed or aborted.
pub fn write(&self) -> Result<Writer, StoreError> {
Ok(Writer::new(self.env.begin_rw_txn().map_err(StoreError::from)?))
pub fn write<T>(&'env self) -> Result<Writer<T>, StoreError>
where
E: BackendEnvironment<'env, RwTransaction = T>,
T: BackendRwCursorTransaction<'env, Database = E::Database>,
{
Ok(Writer::new(self.env.begin_rw_txn().map_err(|e| e.into())?))
}
}
/// Other environment methods.
impl Rkv {
impl<'env, E> Rkv<E>
where
E: BackendEnvironment<'env>,
{
/// Flush the data buffers to disk. This call is only useful, when the environment
/// was open with either `NO_SYNC`, `NO_META_SYNC` or `MAP_ASYNC` (see below).
/// The call is not valid if the environment was opened with `READ_ONLY`.
@ -194,7 +232,7 @@ impl Rkv {
/// Otherwise if the environment has the `NO_SYNC` flag set the flushes will be omitted,
/// and with `MAP_ASYNC` they will be asynchronous.
pub fn sync(&self, force: bool) -> Result<(), StoreError> {
self.env.sync(force).map_err(Into::into)
self.env.sync(force).map_err(|e| e.into())
}
/// Retrieve statistics about this environment.
@ -206,8 +244,8 @@ impl Rkv {
/// * Number of leaf pages
/// * Number of overflow pages
/// * Number of data entries
pub fn stat(&self) -> Result<Stat, StoreError> {
self.env.stat().map_err(Into::into)
pub fn stat(&self) -> Result<E::Stat, StoreError> {
self.env.stat().map_err(|e| e.into())
}
/// Retrieve information about this environment.
@ -218,8 +256,8 @@ impl Rkv {
/// * The last transaction ID
/// * Max number of readers allowed
/// * Number of readers in use
pub fn info(&self) -> Result<Info, StoreError> {
self.env.info().map_err(Into::into)
pub fn info(&self) -> Result<E::Info, StoreError> {
self.env.info().map_err(|e| e.into())
}
/// Retrieve the load ratio (# of used pages / total pages) about this environment.
@ -228,12 +266,12 @@ impl Rkv {
pub fn load_ratio(&self) -> Result<f32, StoreError> {
let stat = self.stat()?;
let info = self.info()?;
let freelist = self.env.freelist()?;
let freelist = self.env.freelist().map_err(|e| e.into())?;
let last_pgno = info.last_pgno() + 1; // pgno is 0 based.
let total_pgs = info.map_size() / stat.page_size() as usize;
let total_pgs = info.map_size() / stat.page_size();
if freelist > last_pgno {
return Err(StoreError::LmdbError(Error::Corrupted));
return Err(StoreError::DatabaseCorrupted);
}
let used_pgs = last_pgno - freelist;
Ok(used_pgs as f32 / total_pgs as f32)
@ -286,6 +324,12 @@ mod tests {
use super::*;
use crate::*;
use crate::backend::{
LmdbDatabase,
LmdbEnvironment,
LmdbRwTransaction,
};
// The default size is 1MB.
const DEFAULT_SIZE: usize = 1024 * 1024;
@ -299,7 +343,7 @@ mod tests {
assert!(!nope.exists());
let pb = nope.to_path_buf();
match Rkv::new(nope.as_path()).err() {
match Rkv::new::<backend::Lmdb>(nope.as_path()).err() {
Some(StoreError::DirectoryDoesNotExistError(p)) => {
assert_eq!(pb, p);
},
@ -307,7 +351,7 @@ mod tests {
};
}
fn check_rkv(k: &Rkv) {
fn check_rkv(k: &Rkv<LmdbEnvironment>) {
let _ = k.open_single("default", StoreOptions::create()).expect("created default");
let yyy = k.open_single("yyy", StoreOptions::create()).expect("opened");
@ -324,7 +368,7 @@ mod tests {
fs::create_dir_all(root.path()).expect("dir created");
assert!(root.path().is_dir());
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
check_rkv(&k);
}
@ -336,7 +380,7 @@ mod tests {
fs::create_dir_all(root.path()).expect("dir created");
assert!(root.path().is_dir());
let mut builder = Rkv::environment_builder();
let mut builder = Rkv::environment_builder::<backend::Lmdb>();
builder.set_max_dbs(2);
let k = Rkv::from_env(root.path(), builder).expect("rkv");
@ -351,7 +395,7 @@ mod tests {
fs::create_dir_all(root.path()).expect("dir created");
assert!(root.path().is_dir());
let k = Rkv::with_capacity(root.path(), 1).expect("rkv");
let k = Rkv::with_capacity::<backend::Lmdb>(root.path(), 1).expect("rkv");
check_rkv(&k);
@ -384,8 +428,8 @@ mod tests {
fs::create_dir_all(root.path()).expect("dir created");
assert!(root.path().is_dir());
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("test", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk = k.open_single("test", StoreOptions::create()).expect("opened");
// Writing a large enough value should cause LMDB to fail on MapFull.
// We write a string that is larger than the default map size.
@ -402,8 +446,8 @@ mod tests {
fs::create_dir_all(root.path()).expect("dir created");
assert!(root.path().is_dir());
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("test", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk = k.open_single("test", StoreOptions::create()).expect("opened");
let key = "k".repeat(512);
let mut writer = k.write().expect("writer");
@ -417,13 +461,13 @@ mod tests {
fs::create_dir_all(root.path()).expect("dir created");
assert!(root.path().is_dir());
let mut builder = Rkv::environment_builder();
let mut builder = Rkv::environment_builder::<backend::Lmdb>();
// Set the map size to the size of the value we'll store in it + 100KiB,
// which ensures that there's enough space for the value and metadata.
builder.set_map_size(get_larger_than_default_map_size_value() + 100 * 1024 /* 100KiB */);
builder.set_max_dbs(2);
let k = Rkv::from_env(root.path(), builder).unwrap();
let sk: SingleStore = k.open_single("test", StoreOptions::create()).expect("opened");
let sk = k.open_single("test", StoreOptions::create()).expect("opened");
let val = "x".repeat(get_larger_than_default_map_size_value());
let mut writer = k.write().expect("writer");
@ -438,9 +482,9 @@ mod tests {
fn test_round_trip_and_transactions() {
let root = Builder::new().prefix("test_round_trip_and_transactions").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::create()).expect("opened");
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
{
let mut writer = k.write().expect("writer");
@ -539,9 +583,9 @@ mod tests {
fn test_single_store_clear() {
let root = Builder::new().prefix("test_single_store_clear").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::create()).expect("opened");
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
{
let mut writer = k.write().expect("writer");
@ -568,7 +612,7 @@ mod tests {
fn test_multi_put_get_del() {
let root = Builder::new().prefix("test_multi_put_get_del").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let multistore = k.open_multi("multistore", StoreOptions::create()).unwrap();
let mut writer = k.write().unwrap();
multistore.put(&mut writer, "str1", &Value::Str("str1 foo")).unwrap();
@ -604,7 +648,7 @@ mod tests {
fn test_multiple_store_clear() {
let root = Builder::new().prefix("test_multiple_store_clear").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let multistore = k.open_multi("multistore", StoreOptions::create()).expect("opened");
@ -637,7 +681,7 @@ mod tests {
fn test_open_store_for_read() {
let root = Builder::new().prefix("test_open_store_for_read").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
// First create the store, and start a write transaction on it.
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
let mut writer = k.write().expect("writer");
@ -657,7 +701,7 @@ mod tests {
fn test_open_a_missing_store() {
let root = Builder::new().prefix("test_open_a_missing_store").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let _sk = k.open("sk", StoreOptions::default()).expect("open a missing store");
}
@ -665,13 +709,13 @@ mod tests {
fn test_open_fail_with_badrslot() {
let root = Builder::new().prefix("test_open_fail_with_badrslot").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
// First create the store
let _sk = k.open_single("sk", StoreOptions::create()).expect("opened");
// Open a reader on this store
let _reader = k.read().expect("reader");
// Open the same store for read while the reader is in progress will panic
let store: Result<SingleStore, StoreError> = k.open_single("sk", StoreOptions::default());
let store = k.open_single("sk", StoreOptions::default());
match store {
Err(StoreError::OpenAttemptedDuringTransaction(_thread_id)) => (),
_ => panic!("should panic"),
@ -682,15 +726,15 @@ mod tests {
fn test_read_before_write_num() {
let root = Builder::new().prefix("test_read_before_write_num").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
// Test reading a number, modifying it, and then writing it back.
// We have to be done with the Value::I64 before calling Writer::put,
// as the Value::I64 borrows an immutable reference to the Writer.
// So we extract and copy its primitive value.
fn get_existing_foo(writer: &Writer, store: SingleStore) -> Option<i64> {
fn get_existing_foo(store: SingleStore<LmdbDatabase>, writer: &Writer<LmdbRwTransaction>) -> Option<i64> {
match store.get(writer, "foo").expect("read") {
Some(Value::I64(val)) => Some(val),
_ => None,
@ -698,11 +742,11 @@ mod tests {
}
let mut writer = k.write().expect("writer");
let mut existing = get_existing_foo(&writer, sk).unwrap_or(99);
let mut existing = get_existing_foo(sk, &writer).unwrap_or(99);
existing += 1;
sk.put(&mut writer, "foo", &Value::I64(existing)).expect("success");
let updated = get_existing_foo(&writer, sk).unwrap_or(99);
let updated = get_existing_foo(sk, &writer).unwrap_or(99);
assert_eq!(updated, 100);
writer.commit().expect("commit");
}
@ -711,8 +755,8 @@ mod tests {
fn test_read_before_write_str() {
let root = Builder::new().prefix("test_read_before_write_str").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
// Test reading a string, modifying it, and then writing it back.
// We have to be done with the Value::Str before calling Writer::put,
@ -734,7 +778,7 @@ mod tests {
fn test_concurrent_read_transactions_prohibited() {
let root = Builder::new().prefix("test_concurrent_reads_prohibited").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let _first = k.read().expect("reader");
let second = k.read();
@ -743,6 +787,9 @@ mod tests {
Err(StoreError::ReadTransactionAlreadyExists(t)) => {
println!("Thread was {:?}", t);
},
Err(e) => {
println!("Got error {:?}", e);
},
_ => {
panic!("Expected error.");
},
@ -753,8 +800,8 @@ mod tests {
fn test_isolation() {
let root = Builder::new().prefix("test_isolation").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let s: SingleStore = k.open_single("s", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let s = k.open_single("s", StoreOptions::create()).expect("opened");
// Add one field.
{
@ -795,8 +842,8 @@ mod tests {
fn test_blob() {
let root = Builder::new().prefix("test_round_trip_blob").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
let mut writer = k.write().expect("writer");
assert_eq!(sk.get(&writer, "foo").expect("read"), None);
@ -832,12 +879,12 @@ mod tests {
fn test_sync() {
let root = Builder::new().prefix("test_sync").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let mut builder = Rkv::environment_builder();
let mut builder = Rkv::environment_builder::<backend::Lmdb>();
builder.set_max_dbs(1);
builder.set_flags(EnvironmentFlags::NO_SYNC);
{
let k = Rkv::from_env(root.path(), builder).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::create()).expect("opened");
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
{
let mut writer = k.write().expect("writer");
@ -847,7 +894,7 @@ mod tests {
}
}
let k = Rkv::from_env(root.path(), builder).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::default()).expect("opened");
let sk = k.open_single("sk", StoreOptions::default()).expect("opened");
let reader = k.read().expect("reader");
assert_eq!(sk.get(&reader, "foo").expect("read"), Some(Value::I64(1234)));
}
@ -856,10 +903,9 @@ mod tests {
fn test_stat() {
let root = Builder::new().prefix("test_stat").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
for i in 0..5 {
let sk: IntegerStore<u32> =
k.open_integer(&format!("sk{}", i)[..], StoreOptions::create()).expect("opened");
let sk = k.open_integer(&format!("sk{}", i)[..], StoreOptions::create()).expect("opened");
{
let mut writer = k.write().expect("writer");
sk.put(&mut writer, i, &Value::I64(i64::from(i))).expect("wrote");
@ -876,8 +922,8 @@ mod tests {
fn test_info() {
let root = Builder::new().prefix("test_info").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
let mut writer = k.write().expect("writer");
sk.put(&mut writer, "foo", &Value::Str("bar")).expect("wrote");
@ -906,8 +952,8 @@ mod tests {
fn test_load_ratio() {
let root = Builder::new().prefix("test_load_ratio").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
let mut writer = k.write().expect("writer");
sk.put(&mut writer, "foo", &Value::Str("bar")).expect("wrote");
writer.commit().expect("commited");
@ -935,8 +981,8 @@ mod tests {
fn test_set_map_size() {
let root = Builder::new().prefix("test_size_map_size").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
assert_eq!(k.info().expect("info").map_size(), DEFAULT_SIZE);
@ -954,8 +1000,8 @@ mod tests {
fn test_iter() {
let root = Builder::new().prefix("test_iter").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
// An iterator over an empty store returns no values.
{
@ -1028,8 +1074,8 @@ mod tests {
fn test_iter_from_key_greater_than_existing() {
let root = Builder::new().prefix("test_iter_from_key_greater_than_existing").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let sk: SingleStore = k.open_single("sk", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let sk = k.open_single("sk", StoreOptions::create()).expect("opened");
let mut writer = k.write().expect("writer");
sk.put(&mut writer, "foo", &Value::I64(1234)).expect("wrote");
@ -1047,11 +1093,11 @@ mod tests {
fn test_multiple_store_read_write() {
let root = Builder::new().prefix("test_multiple_store_read_write").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let s1: SingleStore = k.open_single("store_1", StoreOptions::create()).expect("opened");
let s2: SingleStore = k.open_single("store_2", StoreOptions::create()).expect("opened");
let s3: SingleStore = k.open_single("store_3", StoreOptions::create()).expect("opened");
let s1 = k.open_single("store_1", StoreOptions::create()).expect("opened");
let s2 = k.open_single("store_2", StoreOptions::create()).expect("opened");
let s3 = k.open_single("store_3", StoreOptions::create()).expect("opened");
let mut writer = k.write().expect("writer");
s1.put(&mut writer, "foo", &Value::Str("bar")).expect("wrote");
@ -1087,9 +1133,9 @@ mod tests {
fn test_multiple_store_iter() {
let root = Builder::new().prefix("test_multiple_store_iter").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let s1: SingleStore = k.open_single("store_1", StoreOptions::create()).expect("opened");
let s2: SingleStore = k.open_single("store_2", StoreOptions::create()).expect("opened");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let s1 = k.open_single("store_1", StoreOptions::create()).expect("opened");
let s2 = k.open_single("store_2", StoreOptions::create()).expect("opened");
let mut writer = k.write().expect("writer");
// Write to "s1"
@ -1199,7 +1245,7 @@ mod tests {
fn test_store_multiple_thread() {
let root = Builder::new().prefix("test_multiple_thread").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let rkv_arc = Arc::new(RwLock::new(Rkv::new(root.path()).expect("new succeeded")));
let rkv_arc = Arc::new(RwLock::new(Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded")));
let store = rkv_arc.read().unwrap().open_single("test", StoreOptions::create()).expect("opened");
let num_threads = 10;
@ -1252,7 +1298,7 @@ mod tests {
#[test]
fn test_use_value_as_key() {
let root = Builder::new().prefix("test_use_value_as_key").tempdir().expect("tempdir");
let rkv = Rkv::new(root.path()).expect("new succeeded");
let rkv = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let store = rkv.open_single("store", StoreOptions::create()).expect("opened");
{

@ -8,11 +8,14 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::io;
use std::num;
use std::path::PathBuf;
use std::str;
use std::thread;
use std::thread::ThreadId;
use bincode;
use failure::Fail;
use lmdb;
use crate::value::Type;
@ -51,8 +54,14 @@ impl From<Box<bincode::ErrorKind>> for DataError {
#[derive(Debug, Fail)]
pub enum StoreError {
#[fail(display = "database corrupted")]
DatabaseCorrupted,
#[fail(display = "key/value pair not found")]
KeyValuePairNotFound,
#[fail(display = "I/O error: {:?}", _0)]
IoError(::std::io::Error),
IoError(io::Error),
#[fail(display = "directory does not exist or not a directory: {:?}", _0)]
DirectoryDoesNotExistError(PathBuf),
@ -64,24 +73,22 @@ pub enum StoreError {
LmdbError(lmdb::Error),
#[fail(display = "read transaction already exists in thread {:?}", _0)]
ReadTransactionAlreadyExists(::std::thread::ThreadId),
ReadTransactionAlreadyExists(ThreadId),
#[fail(display = "attempted to open DB during transaction in thread {:?}", _0)]
OpenAttemptedDuringTransaction(::std::thread::ThreadId),
OpenAttemptedDuringTransaction(ThreadId),
#[fail(display = "other backing store error: {}", _0)]
OtherError(i32),
}
impl StoreError {
pub fn open_during_transaction() -> StoreError {
StoreError::OpenAttemptedDuringTransaction(::std::thread::current().id())
StoreError::OpenAttemptedDuringTransaction(thread::current().id())
}
}
impl From<lmdb::Error> for StoreError {
fn from(e: lmdb::Error) -> StoreError {
match e {
lmdb::Error::BadRslot => StoreError::ReadTransactionAlreadyExists(::std::thread::current().id()),
e => StoreError::LmdbError(e),
}
pub fn read_transaction_already_exists() -> StoreError {
StoreError::ReadTransactionAlreadyExists(thread::current().id())
}
}
@ -91,8 +98,8 @@ impl From<DataError> for StoreError {
}
}
impl From<::std::io::Error> for StoreError {
fn from(e: ::std::io::Error) -> StoreError {
impl From<io::Error> for StoreError {
fn from(e: io::Error) -> StoreError {
StoreError::IoError(e)
}
}
@ -109,7 +116,7 @@ pub enum MigrateError {
IndeterminateBitDepth,
#[fail(display = "I/O error: {:?}", _0)]
IoError(::std::io::Error),
IoError(io::Error),
#[fail(display = "invalid DatabaseFlags bits")]
InvalidDatabaseBits,
@ -136,7 +143,7 @@ pub enum MigrateError {
StringConversionError,
#[fail(display = "TryFromInt error: {:?}", _0)]
TryFromIntError(::std::num::TryFromIntError),
TryFromIntError(num::TryFromIntError),
#[fail(display = "unexpected Page variant")]
UnexpectedPageVariant,
@ -148,23 +155,23 @@ pub enum MigrateError {
UnsupportedPageHeaderVariant,
#[fail(display = "UTF8 error: {:?}", _0)]
Utf8Error(::std::str::Utf8Error),
Utf8Error(str::Utf8Error),
}
impl From<::std::io::Error> for MigrateError {
fn from(e: ::std::io::Error) -> MigrateError {
impl From<io::Error> for MigrateError {
fn from(e: io::Error) -> MigrateError {
MigrateError::IoError(e)
}
}
impl From<::std::str::Utf8Error> for MigrateError {
fn from(e: ::std::str::Utf8Error) -> MigrateError {
impl From<str::Utf8Error> for MigrateError {
fn from(e: str::Utf8Error) -> MigrateError {
MigrateError::Utf8Error(e)
}
}
impl From<::std::num::TryFromIntError> for MigrateError {
fn from(e: ::std::num::TryFromIntError) -> MigrateError {
impl From<num::TryFromIntError> for MigrateError {
fn from(e: num::TryFromIntError) -> MigrateError {
MigrateError::TryFromIntError(e)
}
}
@ -183,8 +190,6 @@ impl From<String> for MigrateError {
impl From<lmdb::Error> for MigrateError {
fn from(e: lmdb::Error) -> MigrateError {
match e {
e => MigrateError::LmdbError(e),
}
MigrateError::LmdbError(e)
}
}

@ -0,0 +1,44 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::io;
use std::path::{
Path,
PathBuf,
};
use url::Url;
use crate::error::StoreError;
use crate::value::Value;
pub(crate) fn read_transform(value: Result<&[u8], StoreError>) -> Result<Option<Value>, StoreError> {
match value {
Ok(bytes) => Value::from_tagged_slice(bytes).map(Some).map_err(StoreError::DataError),
Err(StoreError::KeyValuePairNotFound) => Ok(None),
Err(e) => Err(e),
}
}
// Workaround the UNC path on Windows, see https://github.com/rust-lang/rust/issues/42869.
// Otherwise, `Env::from_env()` will panic with error_no(123).
pub(crate) fn canonicalize_path<'p, P>(path: P) -> io::Result<PathBuf>
where
P: Into<&'p Path>,
{
let canonical = path.into().canonicalize()?;
Ok(if cfg!(target_os = "windows") {
let url = Url::from_file_path(&canonical).map_err(|_| io::Error::new(io::ErrorKind::Other, "passing error"))?;
url.to_file_path().map_err(|_| io::Error::new(io::ErrorKind::Other, "path canonicalization error"))?
} else {
canonical
})
}

@ -41,6 +41,7 @@
//! ## Basic Usage
//! ```
//! use rkv::{Manager, Rkv, SingleStore, Value, StoreOptions};
//! use rkv::backend::Lmdb;
//! use std::fs;
//! use tempfile::Builder;
//!
@ -60,11 +61,12 @@
//! // at most once by caching a handle to each environment that it opens.
//! // Use it to retrieve the handle to an opened environment—or create one
//! // if it hasn't already been opened:
//! let created_arc = Manager::singleton().write().unwrap().get_or_create(path, Rkv::new).unwrap();
//! let mut manager = Manager::singleton().write().unwrap();
//! let created_arc = manager.get_or_create(path, Rkv::new::<Lmdb>).unwrap();
//! let env = created_arc.read().unwrap();
//!
//! // Then you can use the environment handle to get a handle to a datastore:
//! let store: SingleStore = env.open_single("mydb", StoreOptions::create()).unwrap();
//! let store = env.open_single("mydb", StoreOptions::create()).unwrap();
//!
//! {
//! // Use a write transaction to mutate the store via a `Writer`.
@ -199,60 +201,44 @@
#![allow(dead_code)]
pub use lmdb::{
DatabaseFlags,
EnvironmentBuilder,
EnvironmentFlags,
WriteFlags,
};
mod env;
pub mod error;
mod error;
mod helpers;
mod manager;
pub mod migrate;
mod readwrite;
pub mod backend;
pub mod migrate;
pub mod store;
pub mod value;
pub use lmdb::{
Cursor,
Database,
Info,
Iter as LmdbIter,
RoCursor,
Stat,
pub use backend::{
DatabaseFlags,
EnvironmentFlags,
WriteFlags,
};
pub use self::readwrite::{
pub use env::Rkv;
pub use error::{
DataError,
MigrateError,
StoreError,
};
pub use manager::Manager;
pub use readwrite::{
Readable,
Reader,
Writer,
};
pub use self::store::integer::IntegerStore;
pub use self::store::integermulti::MultiIntegerStore;
pub use self::store::keys::PrimitiveInt;
pub use self::store::multi::MultiStore;
pub use self::store::single::SingleStore;
pub use self::store::Options as StoreOptions;
pub use self::env::Rkv;
pub use self::error::{
DataError,
StoreError,
pub use store::integer::IntegerStore;
pub use store::integermulti::MultiIntegerStore;
pub use store::keys::{
EncodableKey,
PrimitiveInt,
};
pub use self::manager::Manager;
pub use self::value::{
pub use store::multi::MultiStore;
pub use store::single::SingleStore;
pub use store::Options as StoreOptions;
pub use value::{
OwnedValue,
Value,
};
fn read_transform(val: Result<&[u8], lmdb::Error>) -> Result<Option<Value>, StoreError> {
match val {
Ok(bytes) => Value::from_tagged_slice(bytes).map(Some).map_err(StoreError::DataError),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(StoreError::LmdbError(e)),
}
}

@ -8,74 +8,50 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use lazy_static::lazy_static;
use std::collections::BTreeMap;
use std::io::{
self,
Error,
ErrorKind,
};
use std::collections::btree_map::Entry;
use std::collections::BTreeMap;
use std::os::raw::c_uint;
use std::path::{
Path,
PathBuf,
};
use std::result;
use std::sync::{
Arc,
RwLock,
};
use url::Url;
use lazy_static::lazy_static;
use crate::backend::LmdbEnvironment;
use crate::error::StoreError;
use crate::helpers::canonicalize_path;
use crate::Rkv;
type Result<T> = result::Result<T, StoreError>;
type SharedRkv<E> = Arc<RwLock<Rkv<E>>>;
lazy_static! {
/// A process is only permitted to have one open handle to each Rkv environment.
/// This manager exists to enforce that constraint: don't open environments directly.
static ref MANAGER: RwLock<Manager> = RwLock::new(Manager::new());
}
// Workaround the UNC path on Windows, see https://github.com/rust-lang/rust/issues/42869.
// Otherwise, `Env::from_env()` will panic with error_no(123).
fn canonicalize_path<'p, P>(path: P) -> io::Result<PathBuf>
where
P: Into<&'p Path>,
{
let canonical = path.into().canonicalize()?;
if cfg!(target_os = "windows") {
let url = Url::from_file_path(&canonical).map_err(|_e| Error::new(ErrorKind::Other, "URL passing error"))?;
return url.to_file_path().map_err(|_e| Error::new(ErrorKind::Other, "path canonicalization error"));
}
Ok(canonical)
static ref MANAGER_LMDB: RwLock<Manager<LmdbEnvironment>> = RwLock::new(Manager::new());
}
/// A process is only permitted to have one open handle to each Rkv environment.
/// This manager exists to enforce that constraint: don't open environments directly.
pub struct Manager {
environments: BTreeMap<PathBuf, Arc<RwLock<Rkv>>>,
pub struct Manager<E> {
environments: BTreeMap<PathBuf, SharedRkv<E>>,
}
impl Manager {
fn new() -> Manager {
impl<E> Manager<E> {
fn new() -> Manager<E> {
Manager {
environments: Default::default(),
}
}
pub fn singleton() -> &'static RwLock<Manager> {
&*MANAGER
}
/// Return the open env at `path`, returning `None` if it has not already been opened.
pub fn get<'p, P>(&self, path: P) -> Result<Option<Arc<RwLock<Rkv>>>, ::std::io::Error>
pub fn get<'p, P>(&self, path: P) -> Result<Option<SharedRkv<E>>>
where
P: Into<&'p Path>,
{
@ -84,9 +60,9 @@ impl Manager {
}
/// Return the open env at `path`, or create it by calling `f`.
pub fn get_or_create<'p, F, P>(&mut self, path: P, f: F) -> Result<Arc<RwLock<Rkv>>, StoreError>
pub fn get_or_create<'p, F, P>(&mut self, path: P, f: F) -> Result<SharedRkv<E>>
where
F: FnOnce(&Path) -> Result<Rkv, StoreError>,
F: FnOnce(&Path) -> Result<Rkv<E>>,
P: Into<&'p Path>,
{
let canonical = canonicalize_path(path)?;
@ -101,14 +77,9 @@ impl Manager {
/// Return the open env at `path` with capacity `capacity`,
/// or create it by calling `f`.
pub fn get_or_create_with_capacity<'p, F, P>(
&mut self,
path: P,
capacity: c_uint,
f: F,
) -> Result<Arc<RwLock<Rkv>>, StoreError>
pub fn get_or_create_with_capacity<'p, F, P>(&mut self, path: P, capacity: c_uint, f: F) -> Result<SharedRkv<E>>
where
F: FnOnce(&Path, c_uint) -> Result<Rkv, StoreError>,
F: FnOnce(&Path, c_uint) -> Result<Rkv<E>>,
P: Into<&'p Path>,
{
let canonical = canonicalize_path(path)?;
@ -122,12 +93,37 @@ impl Manager {
}
}
impl Manager<LmdbEnvironment> {
pub fn singleton() -> &'static RwLock<Manager<LmdbEnvironment>> {
&*MANAGER_LMDB
}
}
#[cfg(test)]
mod tests {
use std::fs;
use tempfile::Builder;
use super::*;
use crate::*;
use backend::Lmdb;
/// Test that a manager can be created with simple type inference.
#[test]
fn test_simple() {
let _ = Manager::singleton().write().unwrap();
}
/// Test that a shared Rkv instance can be created with simple type inference.
#[test]
fn test_simple_2() {
let root = Builder::new().prefix("test_simple").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let mut manager = Manager::singleton().write().unwrap();
let _ = manager.get_or_create(root.path(), Rkv::new::<Lmdb>).unwrap();
}
/// Test that the manager will return the same Rkv instance each time for each path.
#[test]
@ -135,12 +131,12 @@ mod tests {
let root = Builder::new().prefix("test_same").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let mut manager = Manager::new();
let mut manager = Manager::<LmdbEnvironment>::new();
let p = root.path();
assert!(manager.get(p).expect("success").is_none());
let created_arc = manager.get_or_create(p, Rkv::new).expect("created");
let created_arc = manager.get_or_create(p, Rkv::new::<Lmdb>).expect("created");
let fetched_arc = manager.get(p).expect("success").expect("existed");
assert!(Arc::ptr_eq(&created_arc, &fetched_arc));
}
@ -148,12 +144,12 @@ mod tests {
/// Test that one can mutate managed Rkv instances in surprising ways.
#[test]
fn test_mutate_managed_rkv() {
let mut manager = Manager::new();
let mut manager = Manager::<LmdbEnvironment>::new();
let root1 = Builder::new().prefix("test_mutate_managed_rkv_1").tempdir().expect("tempdir");
fs::create_dir_all(root1.path()).expect("dir created");
let path1 = root1.path();
let arc = manager.get_or_create(path1, Rkv::new).expect("created");
let arc = manager.get_or_create(path1, Rkv::new::<Lmdb>).expect("created");
// Arc<RwLock<>> has interior mutability, so we can replace arc's Rkv
// instance with a new instance that has a different path.
@ -162,7 +158,7 @@ mod tests {
let path2 = root2.path();
{
let mut rkv = arc.write().expect("guard");
let rkv2 = Rkv::new(path2).expect("Rkv");
let rkv2 = Rkv::new::<Lmdb>(path2).expect("Rkv");
*rkv = rkv2;
}
@ -174,7 +170,7 @@ mod tests {
// Meanwhile, a new Arc for path2 has a different pointer, even though
// its Rkv's path is the same as arc's current path.
let path2_arc = manager.get_or_create(path2, Rkv::new).expect("success");
let path2_arc = manager.get_or_create(path2, Rkv::new::<Lmdb>).expect("success");
assert!(!Arc::ptr_eq(&path2_arc, &arc));
}
@ -184,12 +180,12 @@ mod tests {
let root = Builder::new().prefix("test_same").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let mut manager = Manager::new();
let mut manager = Manager::<LmdbEnvironment>::new();
let p = root.path();
assert!(manager.get(p).expect("success").is_none());
let created_arc = manager.get_or_create_with_capacity(p, 10, Rkv::with_capacity).expect("created");
let created_arc = manager.get_or_create_with_capacity(p, 10, Rkv::with_capacity::<Lmdb>).expect("created");
let fetched_arc = manager.get(p).expect("success").expect("existed");
assert!(Arc::ptr_eq(&created_arc, &fetched_arc));
}

@ -8,88 +8,121 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use lmdb::{
Database,
RoCursor,
RoTransaction,
RwTransaction,
Transaction,
WriteFlags,
use crate::backend::{
BackendDatabase,
BackendRoCursor,
BackendRoCursorTransaction,
BackendRoTransaction,
BackendRwCursorTransaction,
BackendRwTransaction,
};
use crate::error::StoreError;
use crate::read_transform;
use crate::helpers::read_transform;
use crate::value::Value;
pub struct Reader<'env>(pub RoTransaction<'env>);
pub struct Writer<'env>(pub RwTransaction<'env>);
pub struct Reader<T>(T);
pub struct Writer<T>(T);
pub trait Readable<'env> {
type Database: BackendDatabase;
type RoCursor: BackendRoCursor<'env>;
fn get<K>(&'env self, db: Self::Database, k: &K) -> Result<Option<Value<'env>>, StoreError>
where
K: AsRef<[u8]>;
pub trait Readable {
fn get<K: AsRef<[u8]>>(&self, db: Database, k: &K) -> Result<Option<Value>, StoreError>;
fn open_ro_cursor(&self, db: Database) -> Result<RoCursor, StoreError>;
fn open_ro_cursor(&'env self, db: Self::Database) -> Result<Self::RoCursor, StoreError>;
}
impl<'env> Readable for Reader<'env> {
fn get<K: AsRef<[u8]>>(&self, db: Database, k: &K) -> Result<Option<Value>, StoreError> {
let bytes = self.0.get(db, &k);
impl<'env, T> Readable<'env> for Reader<T>
where
T: BackendRoCursorTransaction<'env>,
{
type Database = T::Database;
type RoCursor = T::RoCursor;
fn get<K>(&'env self, db: T::Database, k: &K) -> Result<Option<Value<'env>>, StoreError>
where
K: AsRef<[u8]>,
{
let bytes = self.0.get(db, k.as_ref()).map_err(|e| e.into());
read_transform(bytes)
}
fn open_ro_cursor(&self, db: Database) -> Result<RoCursor, StoreError> {
self.0.open_ro_cursor(db).map_err(StoreError::LmdbError)
fn open_ro_cursor(&'env self, db: T::Database) -> Result<T::RoCursor, StoreError> {
self.0.open_ro_cursor(db).map_err(|e| e.into())
}
}
impl<'env> Reader<'env> {
pub(crate) fn new(txn: RoTransaction) -> Reader {
impl<T> Reader<T> {
pub(crate) fn new(txn: T) -> Reader<T> {
Reader(txn)
}
}
impl<T> Reader<T>
where
T: BackendRoTransaction,
{
pub fn abort(self) {
self.0.abort();
}
}
impl<'env> Readable for Writer<'env> {
fn get<K: AsRef<[u8]>>(&self, db: Database, k: &K) -> Result<Option<Value>, StoreError> {
let bytes = self.0.get(db, &k);
impl<'env, T> Readable<'env> for Writer<T>
where
T: BackendRwCursorTransaction<'env>,
{
type Database = T::Database;
type RoCursor = T::RoCursor;
fn get<K>(&'env self, db: T::Database, k: &K) -> Result<Option<Value<'env>>, StoreError>
where
K: AsRef<[u8]>,
{
let bytes = self.0.get(db, k.as_ref()).map_err(|e| e.into());
read_transform(bytes)
}
fn open_ro_cursor(&self, db: Database) -> Result<RoCursor, StoreError> {
self.0.open_ro_cursor(db).map_err(StoreError::LmdbError)
fn open_ro_cursor(&'env self, db: T::Database) -> Result<T::RoCursor, StoreError> {
self.0.open_ro_cursor(db).map_err(|e| e.into())
}
}
impl<'env> Writer<'env> {
pub(crate) fn new(txn: RwTransaction) -> Writer {
impl<T> Writer<T> {
pub(crate) fn new(txn: T) -> Writer<T> {
Writer(txn)
}
}
impl<T> Writer<T>
where
T: BackendRwTransaction,
{
pub fn commit(self) -> Result<(), StoreError> {
self.0.commit().map_err(StoreError::LmdbError)
self.0.commit().map_err(|e| e.into())
}
pub fn abort(self) {
self.0.abort();
}
pub(crate) fn put<K: AsRef<[u8]>>(
&mut self,
db: Database,
k: &K,
v: &Value,
flags: WriteFlags,
) -> Result<(), StoreError> {
pub(crate) fn put<K>(&mut self, db: T::Database, k: &K, v: &Value, flags: T::Flags) -> Result<(), StoreError>
where
K: AsRef<[u8]>,
{
// TODO: don't allocate twice.
self.0.put(db, &k, &v.to_bytes()?, flags).map_err(StoreError::LmdbError)
self.0.put(db, k.as_ref(), &v.to_bytes()?, flags).map_err(|e| e.into())
}
pub(crate) fn delete<K: AsRef<[u8]>>(&mut self, db: Database, k: &K, v: Option<&[u8]>) -> Result<(), StoreError> {
self.0.del(db, &k, v).map_err(StoreError::LmdbError)
pub(crate) fn delete<K>(&mut self, db: T::Database, k: &K, v: Option<&[u8]>) -> Result<(), StoreError>
where
K: AsRef<[u8]>,
{
self.0.del(db, k.as_ref(), v).map_err(|e| e.into())
}
pub(crate) fn clear(&mut self, db: Database) -> Result<(), StoreError> {
self.0.clear_db(db).map_err(StoreError::LmdbError)
pub(crate) fn clear(&mut self, db: T::Database) -> Result<(), StoreError> {
self.0.clear_db(db).map_err(|e| e.into())
}
}

@ -1,22 +1,35 @@
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
pub mod integer;
pub mod integermulti;
pub mod keys;
pub mod multi;
pub mod single;
pub mod keys;
use lmdb::DatabaseFlags;
use crate::backend::BackendDatabaseFlags;
#[derive(Default, Debug, Copy, Clone)]
pub struct Options {
pub struct Options<F> {
pub create: bool,
pub flags: DatabaseFlags,
pub flags: F,
}
impl Options {
pub fn create() -> Options {
impl<F> Options<F>
where
F: BackendDatabaseFlags,
{
pub fn create() -> Options<F> {
Options {
create: true,
flags: DatabaseFlags::empty(),
flags: F::empty(),
}
}
}

@ -10,56 +10,66 @@
use std::marker::PhantomData;
use lmdb::Database;
use crate::backend::{
BackendDatabase,
BackendRwTransaction,
};
use crate::error::StoreError;
use crate::readwrite::{
Readable,
Writer,
};
use crate::value::Value;
use crate::store::single::SingleStore;
use crate::store::keys::{
Key,
PrimitiveInt,
};
use crate::store::single::SingleStore;
use crate::value::Value;
pub struct IntegerStore<K>
where
K: PrimitiveInt,
{
inner: SingleStore,
type EmptyResult = Result<(), StoreError>;
pub struct IntegerStore<D, K> {
inner: SingleStore<D>,
phantom: PhantomData<K>,
}
impl<K> IntegerStore<K>
impl<D, K> IntegerStore<D, K>
where
D: BackendDatabase,
K: PrimitiveInt,
{
pub(crate) fn new(db: Database) -> IntegerStore<K> {
pub(crate) fn new(db: D) -> IntegerStore<D, K> {
IntegerStore {
inner: SingleStore::new(db),
phantom: PhantomData,
}
}
pub fn get<'env, T: Readable>(&self, reader: &'env T, k: K) -> Result<Option<Value<'env>>, StoreError> {
pub fn get<'env, R>(&self, reader: &'env R, k: K) -> Result<Option<Value<'env>>, StoreError>
where
R: Readable<'env, Database = D>,
{
self.inner.get(reader, Key::new(&k)?)
}
pub fn put(&self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
pub fn put<T>(&self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
{
self.inner.put(writer, Key::new(&k)?, v)
}
pub fn delete(&self, writer: &mut Writer, k: K) -> Result<(), StoreError> {
pub fn delete<T>(&self, writer: &mut Writer<T>, k: K) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
{
self.inner.delete(writer, Key::new(&k)?)
}
pub fn clear(&self, writer: &mut Writer) -> Result<(), StoreError> {
pub fn clear<T>(&self, writer: &mut Writer<T>) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
{
self.inner.clear(writer)
}
}
@ -76,7 +86,8 @@ mod tests {
fn test_integer_keys() {
let root = Builder::new().prefix("test_integer_keys").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let s = k.open_integer("s", StoreOptions::create()).expect("open");
macro_rules! test_integer_keys {
@ -100,7 +111,8 @@ mod tests {
fn test_clear() {
let root = Builder::new().prefix("test_integer_clear").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let s = k.open_integer("s", StoreOptions::create()).expect("open");
{

@ -8,86 +8,104 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use lmdb::{
Database,
WriteFlags,
};
use std::marker::PhantomData;
use crate::backend::{
BackendDatabase,
BackendIter,
BackendRoCursor,
BackendRwTransaction,
};
use crate::error::StoreError;
use crate::readwrite::{
Readable,
Writer,
};
use crate::value::Value;
use crate::store::keys::{
Key,
PrimitiveInt,
};
use crate::store::multi::{
Iter,
MultiStore,
};
use crate::value::Value;
use crate::store::keys::{
Key,
PrimitiveInt,
};
type EmptyResult = Result<(), StoreError>;
pub struct MultiIntegerStore<K>
where
K: PrimitiveInt,
{
inner: MultiStore,
pub struct MultiIntegerStore<D, K> {
inner: MultiStore<D>,
phantom: PhantomData<K>,
}
impl<K> MultiIntegerStore<K>
impl<D, K> MultiIntegerStore<D, K>
where
D: BackendDatabase,
K: PrimitiveInt,
{
pub(crate) fn new(db: Database) -> MultiIntegerStore<K> {
pub(crate) fn new(db: D) -> MultiIntegerStore<D, K> {
MultiIntegerStore {
inner: MultiStore::new(db),
phantom: PhantomData,
}
}
pub fn get<'env, T: Readable>(&self, reader: &'env T, k: K) -> Result<Iter<'env>, StoreError> {
pub fn get<'env, R, I, C>(&self, reader: &'env R, k: K) -> Result<Iter<'env, I, C>, StoreError>
where
R: Readable<'env, Database = D, RoCursor = C>,
I: BackendIter<'env>,
C: BackendRoCursor<'env, Iter = I>,
{
self.inner.get(reader, Key::new(&k)?)
}
pub fn get_first<'env, T: Readable>(&self, reader: &'env T, k: K) -> Result<Option<Value<'env>>, StoreError> {
pub fn get_first<'env, R>(&self, reader: &'env R, k: K) -> Result<Option<Value<'env>>, StoreError>
where
R: Readable<'env, Database = D>,
{
self.inner.get_first(reader, Key::new(&k)?)
}
pub fn put(&self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
pub fn put<T>(&self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
{
self.inner.put(writer, Key::new(&k)?, v)
}
pub fn put_with_flags(&self, writer: &mut Writer, k: K, v: &Value, flags: WriteFlags) -> Result<(), StoreError> {
pub fn put_with_flags<T>(&self, writer: &mut Writer<T>, k: K, v: &Value, flags: T::Flags) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
{
self.inner.put_with_flags(writer, Key::new(&k)?, v, flags)
}
pub fn delete_all(&self, writer: &mut Writer, k: K) -> Result<(), StoreError> {
pub fn delete_all<T>(&self, writer: &mut Writer<T>, k: K) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
{
self.inner.delete_all(writer, Key::new(&k)?)
}
pub fn delete(&self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
pub fn delete<T>(&self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
{
self.inner.delete(writer, Key::new(&k)?, v)
}
pub fn clear(&self, writer: &mut Writer) -> Result<(), StoreError> {
pub fn clear<T>(&self, writer: &mut Writer<T>) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
{
self.inner.clear(writer)
}
}
#[cfg(test)]
mod tests {
extern crate tempfile;
use self::tempfile::Builder;
use std::fs;
use tempfile::Builder;
use super::*;
use crate::*;
@ -96,7 +114,8 @@ mod tests {
fn test_integer_keys() {
let root = Builder::new().prefix("test_integer_keys").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let s = k.open_multi_integer("s", StoreOptions::create()).expect("open");
macro_rules! test_integer_keys {
@ -120,7 +139,8 @@ mod tests {
fn test_clear() {
let root = Builder::new().prefix("test_multi_integer_clear").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<backend::Lmdb>(root.path()).expect("new succeeded");
let s = k.open_multi_integer("s", StoreOptions::create()).expect("open");
{

@ -8,134 +8,121 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use crate::{
error::StoreError,
read_transform,
readwrite::{
Readable,
Writer,
},
value::Value,
use std::marker::PhantomData;
use crate::backend::{
BackendDatabase,
BackendFlags,
BackendIter,
BackendRoCursor,
BackendRwTransaction,
};
use lmdb::{
Cursor,
Database,
Iter as LmdbIter,
// IterDup as LmdbIterDup,
RoCursor,
WriteFlags,
use crate::error::StoreError;
use crate::helpers::read_transform;
use crate::readwrite::{
Readable,
Writer,
};
use crate::value::Value;
type EmptyResult = Result<(), StoreError>;
#[derive(Copy, Clone)]
pub struct MultiStore {
db: Database,
pub struct MultiStore<D> {
db: D,
}
pub struct Iter<'env> {
iter: LmdbIter<'env>,
cursor: RoCursor<'env>,
pub struct Iter<'env, I, C> {
iter: I,
cursor: C,
phantom: PhantomData<&'env ()>,
}
impl MultiStore {
pub(crate) fn new(db: Database) -> MultiStore {
impl<D> MultiStore<D>
where
D: BackendDatabase,
{
pub(crate) fn new(db: D) -> MultiStore<D> {
MultiStore {
db,
}
}
/// Provides a cursor to all of the values for the duplicate entries that match this key
pub fn get<T: Readable, K: AsRef<[u8]>>(self, reader: &T, k: K) -> Result<Iter, StoreError> {
pub fn get<'env, R, I, C, K>(self, reader: &'env R, k: K) -> Result<Iter<'env, I, C>, StoreError>
where
R: Readable<'env, Database = D, RoCursor = C>,
I: BackendIter<'env>,
C: BackendRoCursor<'env, Iter = I>,
K: AsRef<[u8]>,
{
let mut cursor = reader.open_ro_cursor(self.db)?;
let iter = cursor.iter_dup_of(k);
Ok(Iter {
iter,
cursor,
phantom: PhantomData,
})
}
/// Provides the first value that matches this key
pub fn get_first<T: Readable, K: AsRef<[u8]>>(self, reader: &T, k: K) -> Result<Option<Value>, StoreError> {
pub fn get_first<'env, R, K>(self, reader: &'env R, k: K) -> Result<Option<Value<'env>>, StoreError>
where
R: Readable<'env, Database = D>,
K: AsRef<[u8]>,
{
reader.get(self.db, &k)
}
/// Insert a value at the specified key.
/// This put will allow duplicate entries. If you wish to have duplicate entries
/// rejected, use the `put_with_flags` function and specify NO_DUP_DATA
pub fn put<K: AsRef<[u8]>>(self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
writer.put(self.db, &k, v, WriteFlags::empty())
pub fn put<T, K>(self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
K: AsRef<[u8]>,
{
writer.put(self.db, &k, v, T::Flags::empty())
}
pub fn put_with_flags<K: AsRef<[u8]>>(
self,
writer: &mut Writer,
k: K,
v: &Value,
flags: WriteFlags,
) -> Result<(), StoreError> {
pub fn put_with_flags<T, K>(self, writer: &mut Writer<T>, k: K, v: &Value, flags: T::Flags) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
K: AsRef<[u8]>,
{
writer.put(self.db, &k, v, flags)
}
pub fn delete_all<K: AsRef<[u8]>>(self, writer: &mut Writer, k: K) -> Result<(), StoreError> {
pub fn delete_all<T, K>(self, writer: &mut Writer<T>, k: K) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
K: AsRef<[u8]>,
{
writer.delete(self.db, &k, None)
}
pub fn delete<K: AsRef<[u8]>>(self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
pub fn delete<T, K>(self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
K: AsRef<[u8]>,
{
writer.delete(self.db, &k, Some(&v.to_bytes()?))
}
/* TODO - Figure out how to solve the need to have the cursor stick around when
* we are producing iterators from MultiIter
/// Provides an iterator starting at the lexographically smallest value in the store
pub fn iter_start(&self, store: MultiStore) -> Result<MultiIter, StoreError> {
let mut cursor = self.tx.open_ro_cursor(store.0).map_err(StoreError::LmdbError)?;
// We call Cursor.iter() instead of Cursor.iter_start() because
// the latter panics at "called `Result::unwrap()` on an `Err` value:
// NotFound" when there are no items in the store, whereas the former
// returns an iterator that yields no items.
//
// And since we create the Cursor and don't change its position, we can
// be sure that a call to Cursor.iter() will start at the beginning.
//
let iter = cursor.iter_dup();
Ok(MultiIter {
iter,
cursor,
})
}
*/
pub fn clear(self, writer: &mut Writer) -> Result<(), StoreError> {
pub fn clear<T>(self, writer: &mut Writer<T>) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
{
writer.clear(self.db)
}
}
/*
fn read_transform_owned(val: Result<&[u8], lmdb::Error>) -> Result<Option<OwnedValue>, StoreError> {
match val {
Ok(bytes) => Value::from_tagged_slice(bytes).map(|v| Some(OwnedValue::from(&v))).map_err(StoreError::DataError),
Err(lmdb::Error::NotFound) => Ok(None),
Err(e) => Err(StoreError::LmdbError(e)),
}
}
impl<'env> Iterator for MultiIter<'env> {
type Item = Iter<'env>;
fn next(&mut self) -> Option<Self::Item> {
match self.iter.next() {
None => None,
Some(iter) => Some(Iter {
iter,
cursor,
}),
}
}
}
*/
impl<'env> Iterator for Iter<'env> {
impl<'env, I, C> Iterator for Iter<'env, I, C>
where
I: BackendIter<'env>,
C: BackendRoCursor<'env, Iter = I>,
{
type Item = Result<(&'env [u8], Option<Value<'env>>), StoreError>;
fn next(&mut self) -> Option<Self::Item> {
@ -145,7 +132,7 @@ impl<'env> Iterator for Iter<'env> {
Ok(val) => Some(Ok((key, val))),
Err(err) => Some(Err(err)),
},
Some(Err(err)) => Some(Err(StoreError::LmdbError(err))),
Some(Err(err)) => Some(Err(err.into())),
}
}
}

@ -8,54 +8,77 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use crate::{
error::StoreError,
read_transform,
readwrite::{
Readable,
Writer,
},
value::Value,
use std::marker::PhantomData;
use crate::backend::{
BackendDatabase,
BackendFlags,
BackendIter,
BackendRoCursor,
BackendRwTransaction,
};
use lmdb::{
Cursor,
Database,
Iter as LmdbIter,
RoCursor,
WriteFlags,
use crate::error::StoreError;
use crate::helpers::read_transform;
use crate::readwrite::{
Readable,
Writer,
};
use crate::value::Value;
type EmptyResult = Result<(), StoreError>;
#[derive(Copy, Clone)]
pub struct SingleStore {
db: Database,
pub struct SingleStore<D> {
db: D,
}
pub struct Iter<'env> {
iter: LmdbIter<'env>,
cursor: RoCursor<'env>,
pub struct Iter<'env, I, C> {
iter: I,
cursor: C,
phantom: PhantomData<&'env ()>,
}
impl SingleStore {
pub(crate) fn new(db: Database) -> SingleStore {
impl<D> SingleStore<D>
where
D: BackendDatabase,
{
pub(crate) fn new(db: D) -> SingleStore<D> {
SingleStore {
db,
}
}
pub fn get<T: Readable, K: AsRef<[u8]>>(self, reader: &T, k: K) -> Result<Option<Value>, StoreError> {
pub fn get<'env, R, K>(self, reader: &'env R, k: K) -> Result<Option<Value<'env>>, StoreError>
where
R: Readable<'env, Database = D>,
K: AsRef<[u8]>,
{
reader.get(self.db, &k)
}
// TODO: flags
pub fn put<K: AsRef<[u8]>>(self, writer: &mut Writer, k: K, v: &Value) -> Result<(), StoreError> {
writer.put(self.db, &k, v, WriteFlags::empty())
pub fn put<T, K>(self, writer: &mut Writer<T>, k: K, v: &Value) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
K: AsRef<[u8]>,
{
writer.put(self.db, &k, v, T::Flags::empty())
}
pub fn delete<K: AsRef<[u8]>>(self, writer: &mut Writer, k: K) -> Result<(), StoreError> {
pub fn delete<T, K>(self, writer: &mut Writer<T>, k: K) -> EmptyResult
where
T: BackendRwTransaction<Database = D>,
K: AsRef<[u8]>,
{
writer.delete(self.db, &k, None)
}
pub fn iter_start<T: Readable>(self, reader: &T) -> Result<Iter, StoreError> {
pub fn iter_start<'env, R, I, C>(self, reader: &'env R) -> Result<Iter<'env, I, C>, StoreError>
where
R: Readable<'env, Database = D, RoCursor = C>,
I: BackendIter<'env>,
C: BackendRoCursor<'env, Iter = I>,
{
let mut cursor = reader.open_ro_cursor(self.db)?;
// We call Cursor.iter() instead of Cursor.iter_start() because
@ -71,24 +94,41 @@ impl SingleStore {
Ok(Iter {
iter,
cursor,
phantom: PhantomData,
})
}
pub fn iter_from<T: Readable, K: AsRef<[u8]>>(self, reader: &T, k: K) -> Result<Iter, StoreError> {
pub fn iter_from<'env, R, I, C, K>(self, reader: &'env R, k: K) -> Result<Iter<'env, I, C>, StoreError>
where
R: Readable<'env, Database = D, RoCursor = C>,
I: BackendIter<'env>,
C: BackendRoCursor<'env, Iter = I>,
K: AsRef<[u8]>,
{
let mut cursor = reader.open_ro_cursor(self.db)?;
let iter = cursor.iter_from(k);
Ok(Iter {
iter,
cursor,
phantom: PhantomData,
})
}
pub fn clear(self, writer: &mut Writer) -> Result<(), StoreError> {
pub fn clear<T>(self, writer: &mut Writer<T>) -> EmptyResult
where
D: BackendDatabase,
T: BackendRwTransaction<Database = D>,
{
writer.clear(self.db)
}
}
impl<'env> Iterator for Iter<'env> {
impl<'env, I, C> Iterator for Iter<'env, I, C>
where
I: BackendIter<'env>,
C: BackendRoCursor<'env, Iter = I>,
{
type Item = Result<(&'env [u8], Option<Value<'env>>), StoreError>;
fn next(&mut self) -> Option<Self::Item> {
@ -98,7 +138,7 @@ impl<'env> Iterator for Iter<'env> {
Ok(val) => Some(Ok((key, val))),
Err(err) => Some(Err(err)),
},
Some(Err(err)) => Some(Err(StoreError::LmdbError(err))),
Some(Err(err)) => Some(Err(err.into())),
}
}
}

@ -8,6 +8,8 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::fmt;
use arrayref::array_ref;
use bincode::{
deserialize,
@ -15,7 +17,6 @@ use bincode::{
serialized_size,
};
use ordered_float::OrderedFloat;
use uuid::{
Bytes,
Uuid,
@ -68,8 +69,8 @@ impl Type {
}
}
impl ::std::fmt::Display for Type {
fn fmt(&self, f: &mut ::std::fmt::Formatter) -> Result<(), ::std::fmt::Error> {
impl fmt::Display for Type {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.write_str(match *self {
Type::Bool => "bool",
Type::U64 => "u64",
@ -232,9 +233,10 @@ impl<'s> From<&'s OwnedValue> for Value<'s> {
#[cfg(test)]
mod tests {
use super::*;
use ordered_float::OrderedFloat;
use super::*;
#[test]
fn test_value_serialized_size() {
// | Value enum | tag: 1 byte | value_payload |

@ -8,21 +8,25 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::fs;
use serde_derive::Serialize;
use tempfile::Builder;
use rkv::backend::Lmdb;
use rkv::{
PrimitiveInt,
Rkv,
StoreOptions,
Value,
};
use serde_derive::Serialize;
use std::fs;
use tempfile::Builder;
#[test]
fn test_integer_keys() {
let root = Builder::new().prefix("test_integer_keys").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
let s = k.open_integer("s", StoreOptions::create()).expect("open");
macro_rules! test_integer_keys {

@ -8,15 +8,16 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::fs;
use std::sync::Arc;
use tempfile::Builder;
use rkv::backend::Lmdb;
use rkv::{
Manager,
Rkv,
};
use std::{
fs,
sync::Arc,
};
use tempfile::Builder;
#[test]
// Identical to the same-named unit test, but this one confirms that it works
@ -28,7 +29,7 @@ fn test_same() {
let p = root.path();
assert!(Manager::singleton().read().unwrap().get(p).expect("success").is_none());
let created_arc = Manager::singleton().write().unwrap().get_or_create(p, Rkv::new).expect("created");
let created_arc = Manager::singleton().write().unwrap().get_or_create(p, Rkv::new::<Lmdb>).expect("created");
let fetched_arc = Manager::singleton().read().unwrap().get(p).expect("success").expect("existed");
assert!(Arc::ptr_eq(&created_arc, &fetched_arc));
}

@ -8,21 +8,25 @@
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::fs;
use serde_derive::Serialize;
use tempfile::Builder;
use rkv::backend::Lmdb;
use rkv::{
PrimitiveInt,
Rkv,
StoreOptions,
Value,
};
use serde_derive::Serialize;
use std::fs;
use tempfile::Builder;
#[test]
fn test_multi_integer_keys() {
let root = Builder::new().prefix("test_integer_keys").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
let s = k.open_multi_integer("s", StoreOptions::create()).expect("open");
macro_rules! test_integer_keys {

@ -1,4 +1,32 @@
/// consider a struct like this
// Copyright 2018-2019 Mozilla
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use
// this file except in compliance with the License. You may obtain a copy of the
// License at http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software distributed
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
// CONDITIONS OF ANY KIND, either express or implied. See the License for the
// specific language governing permissions and limitations under the License.
use std::fs;
use tempfile::Builder;
use rkv::backend::{
Lmdb,
LmdbDatabase,
LmdbRoCursor,
LmdbRwTransaction,
};
use rkv::{
Readable,
Rkv,
StoreOptions,
Value,
Writer,
};
/// Consider a struct like this:
/// struct Sample {
/// id: u64,
/// value: String,
@ -7,29 +35,19 @@
/// We would like to index all of the fields so that we can search for the struct not only by ID
/// but also by value and date. When we index the fields individually in their own tables, it
/// is important that we run all operations within a single transaction to ensure coherence of
/// the indices
/// the indices.
/// This test features helper functions for reading and writing the parts of the struct.
/// Note that the reader functions take `Readable` because they might run within a Read
/// Transaction or a Write Transaction. The test demonstrates fetching values via both.
use rkv::{
MultiStore,
Readable,
Rkv,
SingleStore,
StoreOptions,
Value,
Writer,
};
use tempfile::Builder;
use std::fs;
type SingleStore = rkv::SingleStore<LmdbDatabase>;
type MultiStore = rkv::MultiStore<LmdbDatabase>;
#[test]
fn read_many() {
let root = Builder::new().prefix("test_txns").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
let k = Rkv::new(root.path()).expect("new succeeded");
let k = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
let samplestore = k.open_single("s", StoreOptions::create()).expect("open");
let datestore = k.open_multi("m", StoreOptions::create()).expect("open");
let valuestore = k.open_multi("m", StoreOptions::create()).expect("open");
@ -71,7 +89,10 @@ fn read_many() {
}
}
fn get_ids_by_field<Txn: Readable>(txn: &Txn, store: MultiStore, field: &str) -> Vec<u64> {
fn get_ids_by_field<'env, T>(txn: &'env T, store: MultiStore, field: &str) -> Vec<u64>
where
T: Readable<'env, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'env>>,
{
store
.get(txn, field)
.expect("get iterator")
@ -82,7 +103,10 @@ fn get_ids_by_field<Txn: Readable>(txn: &Txn, store: MultiStore, field: &str) ->
.collect::<Vec<u64>>()
}
fn get_samples<Txn: Readable>(txn: &Txn, samplestore: SingleStore, ids: &[u64]) -> Vec<String> {
fn get_samples<'env, T>(txn: &'env T, samplestore: SingleStore, ids: &[u64]) -> Vec<String>
where
T: Readable<'env, Database = LmdbDatabase, RoCursor = LmdbRoCursor<'env>>,
{
ids.iter()
.map(|id| {
let bytes = id.to_be_bytes();
@ -95,11 +119,11 @@ fn get_samples<Txn: Readable>(txn: &Txn, samplestore: SingleStore, ids: &[u64])
.collect::<Vec<String>>()
}
fn put_sample(txn: &mut Writer, samplestore: SingleStore, id: u64, value: &str) {
fn put_sample(txn: &mut Writer<LmdbRwTransaction>, samplestore: SingleStore, id: u64, value: &str) {
let idbytes = id.to_be_bytes();
samplestore.put(txn, &idbytes, &Value::Str(value)).expect("put id");
}
fn put_id_field(txn: &mut Writer, store: MultiStore, field: &str, id: u64) {
fn put_id_field(txn: &mut Writer<LmdbRwTransaction>, store: MultiStore, field: &str, id: u64) {
store.put(txn, field, &Value::U64(id)).expect("put id");
}

Loading…
Cancel
Save