Prevent potential deadlocks in safe mode environments when opnening dbs

Signed-off-by: Victor Porof <victor.porof@gmail.com>
without.crypto
Victor Porof 4 years ago
parent 2d93f06d39
commit b51b5e07a5
  1. 70
      src/backend/impl_safe/environment.rs
  2. 6
      src/backend/impl_safe/transaction.rs
  3. 2
      tests/env-migration.rs

@ -12,6 +12,7 @@ use std::{
borrow::Cow,
collections::HashMap,
fs,
ops::DerefMut,
path::{
Path,
PathBuf,
@ -115,32 +116,51 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
}
}
#[derive(Debug)]
pub(crate) struct EnvironmentDbs {
pub(crate) arena: DatabaseArena,
pub(crate) name_map: DatabaseNameMap,
}
#[derive(Debug)]
pub(crate) struct EnvironmentDbsRefMut<'a> {
pub(crate) arena: &'a mut DatabaseArena,
pub(crate) name_map: &'a mut DatabaseNameMap,
}
impl<'a> From<&'a mut EnvironmentDbs> for EnvironmentDbsRefMut<'a> {
fn from(dbs: &mut EnvironmentDbs) -> EnvironmentDbsRefMut {
EnvironmentDbsRefMut {
arena: &mut dbs.arena,
name_map: &mut dbs.name_map,
}
}
}
#[derive(Debug)]
pub struct EnvironmentImpl {
path: PathBuf,
max_dbs: usize,
arena: RwLock<DatabaseArena>,
dbs: RwLock<DatabaseNameMap>,
dbs: RwLock<EnvironmentDbs>,
ro_txns: Arc<()>,
rw_txns: Arc<()>,
}
impl EnvironmentImpl {
fn serialize(&self) -> Result<Vec<u8>, ErrorImpl> {
let arena = self.arena.read().map_err(|_| ErrorImpl::EnvPoisonError)?;
let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?;
let data: HashMap<_, _> = dbs.iter().map(|(name, id)| (name, &arena[id.0])).collect();
let data: HashMap<_, _> = dbs.name_map.iter().map(|(name, id)| (name, &dbs.arena[id.0])).collect();
Ok(bincode::serialize(&data)?)
}
fn deserialize(bytes: &[u8]) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> {
let mut arena = DatabaseArena::new();
let mut dbs = HashMap::new();
let mut name_map = HashMap::new();
let data: HashMap<_, _> = bincode::deserialize(&bytes)?;
for (name, db) in data {
dbs.insert(name, DatabaseImpl(arena.alloc(db)));
name_map.insert(name, DatabaseImpl(arena.alloc(db)));
}
Ok((arena, dbs))
Ok((arena, name_map))
}
}
@ -165,8 +185,10 @@ impl EnvironmentImpl {
Ok(EnvironmentImpl {
path: path.to_path_buf(),
max_dbs: max_dbs.unwrap_or(std::usize::MAX),
arena: RwLock::new(DatabaseArena::new()),
dbs: RwLock::new(HashMap::new()),
dbs: RwLock::new(EnvironmentDbs {
arena: DatabaseArena::new(),
name_map: HashMap::new(),
}),
ro_txns: Arc::new(()),
rw_txns: Arc::new(()),
})
@ -180,9 +202,11 @@ impl EnvironmentImpl {
if fs::metadata(&path).is_err() {
return Ok(());
};
let (arena, dbs) = Self::deserialize(&fs::read(&path)?)?;
self.arena = RwLock::new(arena);
self.dbs = RwLock::new(dbs);
let (arena, name_map) = Self::deserialize(&fs::read(&path)?)?;
self.dbs = RwLock::new(EnvironmentDbs {
arena,
name_map,
});
Ok(())
}
@ -195,12 +219,12 @@ impl EnvironmentImpl {
Ok(())
}
pub(crate) fn dbs(&self) -> Result<RwLockReadGuard<DatabaseArena>, ErrorImpl> {
self.arena.read().map_err(|_| ErrorImpl::EnvPoisonError)
pub(crate) fn dbs(&self) -> Result<RwLockReadGuard<EnvironmentDbs>, ErrorImpl> {
self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)
}
pub(crate) fn dbs_mut(&self) -> Result<RwLockWriteGuard<DatabaseArena>, ErrorImpl> {
self.arena.write().map_err(|_| ErrorImpl::EnvPoisonError)
pub(crate) fn dbs_mut(&self) -> Result<RwLockWriteGuard<EnvironmentDbs>, ErrorImpl> {
self.dbs.write().map_err(|_| ErrorImpl::EnvPoisonError)
}
}
@ -215,7 +239,7 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error> {
let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?;
Ok(dbs.keys().map(|key| key.to_owned()).collect())
Ok(dbs.name_map.keys().map(|key| key.to_owned()).collect())
}
fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> {
@ -225,8 +249,8 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
// TOOD: don't reallocate `name`.
let key = name.map(String::from);
let dbs = self.dbs.read().map_err(|_| ErrorImpl::EnvPoisonError)?;
let id = dbs.get(&key).ok_or(ErrorImpl::DbNotFoundError)?;
Ok(*id)
let db = dbs.name_map.get(&key).ok_or(ErrorImpl::DbNotFoundError)?;
Ok(*db)
}
fn create_db(&self, name: Option<&str>, flags: Self::Flags) -> Result<Self::Database, Self::Error> {
@ -236,11 +260,13 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
// TOOD: don't reallocate `name`.
let key = name.map(String::from);
let mut dbs = self.dbs.write().map_err(|_| ErrorImpl::EnvPoisonError)?;
let mut arena = self.arena.write().map_err(|_| ErrorImpl::EnvPoisonError)?;
if dbs.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name != None {
if dbs.name_map.keys().filter_map(|k| k.as_ref()).count() >= self.max_dbs && name != None {
return Err(ErrorImpl::DbsFull);
}
let id = dbs.entry(key).or_insert_with(|| DatabaseImpl(arena.alloc(Database::new(Some(flags), None))));
let parts = EnvironmentDbsRefMut::from(dbs.deref_mut());
let arena = parts.arena;
let name_map = parts.name_map;
let id = name_map.entry(key).or_insert_with(|| DatabaseImpl(arena.alloc(Database::new(Some(flags), None))));
Ok(*id)
}

@ -37,7 +37,7 @@ pub struct RoTransactionImpl<'t> {
impl<'t> RoTransactionImpl<'t> {
pub(crate) fn new(env: &'t EnvironmentImpl, idx: Arc<()>) -> Result<RoTransactionImpl<'t>, ErrorImpl> {
let snapshots = env.dbs()?.iter().map(|(id, db)| (DatabaseImpl(id), db.snapshot())).collect();
let snapshots = env.dbs()?.arena.iter().map(|(id, db)| (DatabaseImpl(id), db.snapshot())).collect();
Ok(RoTransactionImpl {
env,
snapshots,
@ -78,7 +78,7 @@ pub struct RwTransactionImpl<'t> {
impl<'t> RwTransactionImpl<'t> {
pub(crate) fn new(env: &'t EnvironmentImpl, idx: Arc<()>) -> Result<RwTransactionImpl<'t>, ErrorImpl> {
let snapshots = env.dbs()?.iter().map(|(id, db)| (DatabaseImpl(id), db.snapshot())).collect();
let snapshots = env.dbs()?.arena.iter().map(|(id, db)| (DatabaseImpl(id), db.snapshot())).collect();
Ok(RwTransactionImpl {
env,
snapshots,
@ -144,7 +144,7 @@ impl<'t> BackendRwTransaction for RwTransactionImpl<'t> {
let mut dbs = self.env.dbs_mut()?;
for (id, snapshot) in self.snapshots {
let db = dbs.get_mut(id.0).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
let db = dbs.arena.get_mut(id.0).ok_or_else(|| ErrorImpl::DbIsForeignError)?;
db.replace(snapshot);
}

@ -428,6 +428,7 @@ fn test_easy_migrator_from_manager_failed_migration_1() {
let created_dst_arc_1 = dst_manager.get_or_create(root.path(), Rkv::new::<SafeMode>).unwrap();
let dst_env_1 = created_dst_arc_1.read().unwrap();
populate_store!(&dst_env_1);
dst_env_1.sync(true).expect("synced");
}
// Attempt to migrate again in a new env. This should *NOT* fail with DestinationNotEmpty.
@ -453,6 +454,7 @@ fn test_easy_migrator_from_manager_failed_migration_2() {
let created_dst_arc_1 = dst_manager.get_or_create(root.path(), Rkv::new::<Lmdb>).unwrap();
let dst_env_1 = created_dst_arc_1.read().unwrap();
populate_store!(&dst_env_1);
dst_env_1.sync(true).expect("synced");
}
// Attempt to migrate again in a new env. This should *NOT* fail with DestinationNotEmpty.

Loading…
Cancel
Save