Use an arena for safe mode databases

Signed-off-by: Victor Porof <victor.porof@gmail.com>
without.crypto
Victor Porof 5 years ago
parent 89c0da6522
commit eaf70e7f94
  1. 3
      Cargo.toml
  2. 2
      src/backend.rs
  3. 2
      src/backend/impl_safe.rs
  4. 53
      src/backend/impl_safe/database.rs
  5. 60
      src/backend/impl_safe/environment.rs
  6. 2
      src/backend/impl_safe/error.rs
  7. 69
      src/backend/impl_safe/transaction.rs

@ -16,7 +16,7 @@ exclude = ["/tests/envs/*"]
[features]
default = ["with-safe-mode"]
backtrace = ["failure/backtrace", "failure/std"]
with-safe-mode = ["log", "uuid/v4", "uuid/serde", "serde/derive", "serde/rc"]
with-safe-mode = ["id-arena", "log", "serde/derive"]
with-asan = ["lmdb-rkv/with-asan"]
with-fuzzer = ["lmdb-rkv/with-fuzzer"]
with-fuzzer-no-link = ["lmdb-rkv/with-fuzzer-no-link"]
@ -26,6 +26,7 @@ arrayref = "0.3"
bincode = "1.0"
bitflags = "1"
byteorder = "1"
id-arena = { version = "2.2", optional = true }
lazy_static = "1.0"
lmdb-rkv = "0.12.3"
log = { version = "0.4", optional = true }

@ -24,7 +24,7 @@ pub use impl_lmdb::RoCursorImpl as LmdbRoCursor;
pub use impl_lmdb::RoTransactionImpl as LmdbRoTransaction;
pub use impl_lmdb::RwTransactionImpl as LmdbRwTransaction;
pub use impl_safe::DatabaseImpl as SafeModeDatabase;
pub use impl_safe::DatabaseId as SafeModeDatabase;
pub use impl_safe::EnvironmentBuilderImpl as SafeMode;
pub use impl_safe::EnvironmentImpl as SafeModeEnvironment;
pub use impl_safe::ErrorImpl as SafeModeError;

@ -22,7 +22,7 @@ pub use cursor::{
RoCursorImpl,
RwCursorImpl,
};
pub use database::DatabaseImpl;
pub use database::DatabaseId;
pub use environment::{
EnvironmentBuilderImpl,
EnvironmentImpl,

@ -12,72 +12,59 @@ use std::collections::{
BTreeSet,
HashMap,
};
use std::sync::{
Arc,
RwLock,
};
use id_arena::Id;
use serde_derive::{
Deserialize,
Serialize,
};
use uuid::Uuid;
use super::{
DatabaseFlagsImpl,
ErrorImpl,
};
use super::DatabaseFlagsImpl;
use crate::backend::traits::BackendDatabase;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub type DatabaseId = Id<DatabaseImpl>;
impl BackendDatabase for DatabaseId {}
#[derive(Debug, Serialize, Deserialize)]
pub struct DatabaseImpl {
id: Uuid,
flags: DatabaseFlagsImpl,
snapshot: Arc<RwLock<Snapshot>>,
snapshot: Snapshot,
}
impl DatabaseImpl {
pub(crate) fn new(flags: Option<DatabaseFlagsImpl>, snapshot: Option<Snapshot>) -> DatabaseImpl {
DatabaseImpl {
id: Uuid::new_v4(),
flags: flags.unwrap_or_else(DatabaseFlagsImpl::default),
snapshot: Arc::new(RwLock::new(snapshot.unwrap_or_else(Snapshot::new))),
snapshot: snapshot.unwrap_or_else(|| Snapshot::new(flags)),
}
}
pub(crate) fn id(&self) -> &Uuid {
&self.id
}
pub(crate) fn flags(&self) -> &DatabaseFlagsImpl {
&self.flags
}
pub(crate) fn snapshot(&self) -> Result<Snapshot, ErrorImpl> {
let snapshot = self.snapshot.read().map_err(|_| ErrorImpl::TxnPoisonError)?;
Ok(snapshot.clone())
pub(crate) fn snapshot(&self) -> Snapshot {
self.snapshot.clone()
}
pub(crate) fn replace(&mut self, value: Snapshot) -> Result<Snapshot, ErrorImpl> {
let mut snapshot = self.snapshot.write().map_err(|_| ErrorImpl::TxnPoisonError)?;
Ok(std::mem::replace(&mut snapshot, value))
pub(crate) fn replace(&mut self, snapshot: Snapshot) -> Snapshot {
std::mem::replace(&mut self.snapshot, snapshot)
}
}
impl BackendDatabase for DatabaseImpl {}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Snapshot {
flags: DatabaseFlagsImpl,
map: HashMap<Box<[u8]>, BTreeSet<Box<[u8]>>>,
}
impl Snapshot {
pub(crate) fn new() -> Snapshot {
pub(crate) fn new(flags: Option<DatabaseFlagsImpl>) -> Snapshot {
Snapshot {
flags: flags.unwrap_or_else(DatabaseFlagsImpl::default),
map: HashMap::new(),
}
}
pub(crate) fn flags(&self) -> &DatabaseFlagsImpl {
&self.flags
}
pub(crate) fn get(&self, key: &[u8]) -> Option<&[u8]> {
self.map.get(key).and_then(|v| v.iter().next()).map(|v| v.as_ref())
}

@ -21,11 +21,13 @@ use std::sync::{
RwLockWriteGuard,
};
use id_arena::Arena;
use log::warn;
use super::{
database::DatabaseImpl,
DatabaseFlagsImpl,
DatabaseImpl,
DatabaseId,
EnvironmentFlagsImpl,
ErrorImpl,
InfoImpl,
@ -40,6 +42,9 @@ use crate::backend::traits::{
const DEFAULT_DB_FILENAME: &str = "data.safe.bin";
type DatabaseArena = Arena<DatabaseImpl>;
type DatabaseNameMap = HashMap<Option<String>, DatabaseId>;
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
pub struct EnvironmentBuilderImpl {
flags: EnvironmentFlagsImpl,
@ -89,13 +94,34 @@ impl<'env> BackendEnvironmentBuilder<'env> for EnvironmentBuilderImpl {
#[derive(Debug)]
pub struct EnvironmentImpl {
path: PathBuf,
dbs: RwLock<HashMap<Option<String>, DatabaseImpl>>,
arena: RwLock<DatabaseArena>,
dbs: RwLock<DatabaseNameMap>,
}
impl EnvironmentImpl {
fn serialize(&self) -> Result<Vec<u8>, ErrorImpl> {
let arena = self.arena.read().map_err(|_| ErrorImpl::DbPoisonError)?;
let dbs = self.dbs.read().map_err(|_| ErrorImpl::DbPoisonError)?;
let data: HashMap<_, _> = dbs.iter().map(|(name, id)| (name, &arena[*id])).collect();
Ok(bincode::serialize(&data)?)
}
fn deserialize(bytes: &[u8]) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> {
let mut arena = DatabaseArena::new();
let mut dbs = HashMap::new();
let data: HashMap<_, _> = bincode::deserialize(&bytes)?;
for (name, db) in data {
dbs.insert(name, arena.alloc(db));
}
Ok((arena, dbs))
}
}
impl EnvironmentImpl {
pub(crate) fn new(path: &Path, _flags: EnvironmentFlagsImpl) -> Result<EnvironmentImpl, ErrorImpl> {
Ok(EnvironmentImpl {
path: path.to_path_buf(),
arena: RwLock::new(DatabaseArena::new()),
dbs: RwLock::new(HashMap::new()),
})
}
@ -106,10 +132,11 @@ impl EnvironmentImpl {
path.to_mut().push(DEFAULT_DB_FILENAME);
};
if fs::metadata(&path).is_err() {
fs::write(&path, bincode::serialize(&self.dbs)?)?;
return Ok(());
};
let serialized = fs::read(&path)?;
self.dbs = bincode::deserialize(&serialized)?;
let (arena, dbs) = Self::deserialize(&fs::read(&path)?)?;
self.arena = RwLock::new(arena);
self.dbs = RwLock::new(dbs);
Ok(())
}
@ -118,22 +145,22 @@ impl EnvironmentImpl {
if fs::metadata(&path)?.is_dir() {
path.to_mut().push(DEFAULT_DB_FILENAME);
};
fs::write(&path, bincode::serialize(&self.dbs)?)?;
fs::write(&path, self.serialize()?)?;
Ok(())
}
pub(crate) fn dbs(&self) -> Result<RwLockReadGuard<HashMap<Option<String>, DatabaseImpl>>, ErrorImpl> {
self.dbs.read().map_err(|_| ErrorImpl::DbPoisonError)
pub(crate) fn dbs(&self) -> Result<RwLockReadGuard<DatabaseArena>, ErrorImpl> {
self.arena.read().map_err(|_| ErrorImpl::DbPoisonError)
}
pub(crate) fn dbs_mut(&self) -> Result<RwLockWriteGuard<HashMap<Option<String>, DatabaseImpl>>, ErrorImpl> {
self.dbs.write().map_err(|_| ErrorImpl::DbPoisonError)
pub(crate) fn dbs_mut(&self) -> Result<RwLockWriteGuard<DatabaseArena>, ErrorImpl> {
self.arena.write().map_err(|_| ErrorImpl::DbPoisonError)
}
}
impl<'env> BackendEnvironment<'env> for EnvironmentImpl {
type Error = ErrorImpl;
type Database = DatabaseImpl;
type Database = DatabaseId;
type Flags = DatabaseFlagsImpl;
type Stat = StatImpl;
type Info = InfoImpl;
@ -142,16 +169,19 @@ impl<'env> BackendEnvironment<'env> for EnvironmentImpl {
fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> {
// TOOD: don't reallocate `name`.
let key = name.map(String::from);
let dbs = self.dbs.read().map_err(|_| ErrorImpl::DbPoisonError)?;
let db = dbs.get(&name.map(String::from)).ok_or(ErrorImpl::DbNotFoundError)?.clone();
Ok(db)
let id = dbs.get(&key).ok_or(ErrorImpl::DbNotFoundError)?;
Ok(*id)
}
fn create_db(&self, name: Option<&str>, flags: Self::Flags) -> Result<Self::Database, Self::Error> {
// TOOD: don't reallocate `name`.
let key = name.map(String::from);
let mut dbs = self.dbs.write().map_err(|_| ErrorImpl::DbPoisonError)?;
let db = dbs.entry(name.map(String::from)).or_insert_with(|| DatabaseImpl::new(Some(flags), None)).clone();
Ok(db)
let mut arena = self.arena.write().map_err(|_| ErrorImpl::DbPoisonError)?;
let id = dbs.entry(key).or_insert_with(|| arena.alloc(DatabaseImpl::new(Some(flags), None)));
Ok(*id)
}
fn begin_ro_txn(&'env self) -> Result<Self::RoTransaction, Self::Error> {

@ -22,7 +22,6 @@ pub enum ErrorImpl {
DbPoisonError,
DbNotFoundError,
DbIsForeignError,
TxnPoisonError,
IoError(io::Error),
BincodeError(BincodeError),
}
@ -36,7 +35,6 @@ impl fmt::Display for ErrorImpl {
ErrorImpl::DbPoisonError => write!(fmt, "DbPoisonError (safe mode)"),
ErrorImpl::DbNotFoundError => write!(fmt, "DbNotFoundError (safe mode)"),
ErrorImpl::DbIsForeignError => write!(fmt, "DbIsForeignError (safe mode)"),
ErrorImpl::TxnPoisonError => write!(fmt, "TxnPoisonError (safe mode)"),
ErrorImpl::IoError(e) => e.fmt(fmt),
ErrorImpl::BincodeError(e) => e.fmt(fmt),
}

@ -10,12 +10,10 @@
use std::collections::HashMap;
use uuid::Uuid;
use super::{
database::Snapshot,
DatabaseFlagsImpl,
DatabaseImpl,
DatabaseId,
EnvironmentImpl,
ErrorImpl,
RoCursorImpl,
@ -31,12 +29,12 @@ use crate::backend::traits::{
#[derive(Debug)]
pub struct RoTransactionImpl<'env> {
env: &'env EnvironmentImpl,
snapshots: HashMap<Uuid, Result<Snapshot, ErrorImpl>>,
snapshots: HashMap<DatabaseId, Snapshot>,
}
impl<'env> RoTransactionImpl<'env> {
pub(crate) fn new(env: &'env EnvironmentImpl) -> Result<RoTransactionImpl<'env>, ErrorImpl> {
let snapshots = env.dbs()?.iter().map(|(_, db)| (*db.id(), db.snapshot())).collect();
let snapshots = env.dbs()?.iter().map(|(id, db)| (id, db.snapshot())).collect();
Ok(RoTransactionImpl {
env,
snapshots,
@ -46,12 +44,11 @@ impl<'env> RoTransactionImpl<'env> {
impl<'env> BackendRoTransaction for RoTransactionImpl<'env> {
type Error = ErrorImpl;
type Database = DatabaseImpl;
type Database = DatabaseId;
fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> {
let snapshot = self.snapshots.get(db.id()).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
let data = snapshot.as_ref().map_err(|_| ErrorImpl::TxnPoisonError)?;
data.get(key).ok_or_else(|| ErrorImpl::KeyValuePairNotFound)
let snapshot = self.snapshots.get(db).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
snapshot.get(key).ok_or_else(|| ErrorImpl::KeyValuePairNotFound)
}
fn abort(self) {
@ -63,21 +60,20 @@ impl<'env> BackendRoCursorTransaction<'env> for RoTransactionImpl<'env> {
type RoCursor = RoCursorImpl<'env>;
fn open_ro_cursor(&'env self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error> {
let snapshot = self.snapshots.get(db.id()).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
let data = snapshot.as_ref().map_err(|_| ErrorImpl::TxnPoisonError)?;
Ok(RoCursorImpl(data))
let snapshot = self.snapshots.get(db).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
Ok(RoCursorImpl(snapshot))
}
}
#[derive(Debug)]
pub struct RwTransactionImpl<'env> {
env: &'env EnvironmentImpl,
snapshots: HashMap<Uuid, Result<Snapshot, ErrorImpl>>,
snapshots: HashMap<DatabaseId, Snapshot>,
}
impl<'env> RwTransactionImpl<'env> {
pub(crate) fn new(env: &'env EnvironmentImpl) -> Result<RwTransactionImpl<'env>, ErrorImpl> {
let snapshots = env.dbs()?.iter().map(|(_, db)| (*db.id(), db.snapshot())).collect();
let snapshots = env.dbs()?.iter().map(|(id, db)| (id, db.snapshot())).collect();
Ok(RwTransactionImpl {
env,
snapshots,
@ -87,40 +83,36 @@ impl<'env> RwTransactionImpl<'env> {
impl<'env> BackendRwTransaction for RwTransactionImpl<'env> {
type Error = ErrorImpl;
type Database = DatabaseImpl;
type Database = DatabaseId;
type Flags = WriteFlagsImpl;
fn get(&self, db: &Self::Database, key: &[u8]) -> Result<&[u8], Self::Error> {
let snapshot = self.snapshots.get(db.id()).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
let data = snapshot.as_ref().map_err(|_| ErrorImpl::TxnPoisonError)?;
data.get(key).ok_or_else(|| ErrorImpl::KeyValuePairNotFound)
let snapshot = self.snapshots.get(db).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
snapshot.get(key).ok_or_else(|| ErrorImpl::KeyValuePairNotFound)
}
fn put(&mut self, db: &Self::Database, key: &[u8], value: &[u8], _flags: Self::Flags) -> Result<(), Self::Error> {
let snapshot = self.snapshots.get_mut(db.id()).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
let data = snapshot.as_mut().map_err(|_| ErrorImpl::TxnPoisonError)?;
if db.flags().contains(DatabaseFlagsImpl::DUP_SORT) {
data.put_dup(key, value);
let snapshot = self.snapshots.get_mut(db).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
if snapshot.flags().contains(DatabaseFlagsImpl::DUP_SORT) {
snapshot.put_dup(key, value);
} else {
data.put_one(key, value);
snapshot.put_one(key, value);
}
Ok(())
}
fn del(&mut self, db: &Self::Database, key: &[u8], value: Option<&[u8]>) -> Result<(), Self::Error> {
let snapshot = self.snapshots.get_mut(db.id()).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
let data = snapshot.as_mut().map_err(|_| ErrorImpl::TxnPoisonError)?;
let deleted = match (value, db.flags()) {
(Some(value), flags) if flags.contains(DatabaseFlagsImpl::DUP_SORT) => data.del_exact(key, value),
_ => data.del_all(key),
let snapshot = self.snapshots.get_mut(db).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
let deleted = match (value, snapshot.flags()) {
(Some(value), flags) if flags.contains(DatabaseFlagsImpl::DUP_SORT) => snapshot.del_exact(key, value),
_ => snapshot.del_all(key),
};
Ok(deleted.ok_or_else(|| ErrorImpl::KeyValuePairNotFound)?)
}
fn clear_db(&mut self, db: &Self::Database) -> Result<(), Self::Error> {
let snapshot = self.snapshots.get_mut(db.id()).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
let data = snapshot.as_mut().map_err(|_| ErrorImpl::TxnPoisonError)?;
data.clear();
let snapshot = self.snapshots.get_mut(db).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
snapshot.clear();
Ok(())
}
@ -128,14 +120,8 @@ impl<'env> BackendRwTransaction for RwTransactionImpl<'env> {
let mut dbs = self.env.dbs_mut()?;
for (id, snapshot) in self.snapshots {
match dbs.iter_mut().find(|(_, db)| db.id() == &id) {
Some((_, db)) => {
db.replace(snapshot?)?;
},
None => {
unreachable!();
},
}
let db = dbs.get_mut(id).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
db.replace(snapshot);
}
drop(dbs);
@ -151,8 +137,7 @@ impl<'env> BackendRwCursorTransaction<'env> for RwTransactionImpl<'env> {
type RoCursor = RoCursorImpl<'env>;
fn open_ro_cursor(&'env self, db: &Self::Database) -> Result<Self::RoCursor, Self::Error> {
let snapshot = self.snapshots.get(db.id()).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
let data = snapshot.as_ref().map_err(|_| ErrorImpl::TxnPoisonError)?;
Ok(RoCursorImpl(data))
let snapshot = self.snapshots.get(db).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
Ok(RoCursorImpl(snapshot))
}
}

Loading…
Cancel
Save