Merge pull request #207 from mozilla/dealing-with-corrupted

Add a way to discard corrupted databases on load
without.crypto
Victor Porof 4 years ago committed by GitHub
commit 8b28b1b8e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 16
      src/backend/impl_lmdb/environment.rs
  2. 30
      src/backend/impl_safe/environment.rs
  3. 4
      src/backend/traits.rs
  4. 19
      src/env.rs
  5. 36
      src/error.rs
  6. 1
      src/lib.rs
  7. 26
      src/manager.rs
  8. 14
      src/migrator.rs
  9. 13
      src/store.rs
  10. 121
      tests/manager.rs

@ -44,7 +44,7 @@ pub struct EnvironmentBuilderImpl {
env_path_type: EnvironmentPathType,
env_lock_type: EnvironmentLockType,
env_db_type: EnvironmentDefaultDbType,
make_dir: bool,
make_dir_if_needed: bool,
}
impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
@ -58,7 +58,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
env_path_type: EnvironmentPathType::SubDir,
env_lock_type: EnvironmentLockType::Lockfile,
env_db_type: EnvironmentDefaultDbType::SingleDatabase,
make_dir: false,
make_dir_if_needed: false,
}
}
@ -95,11 +95,17 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}
fn set_make_dir_if_needed(&mut self, make_dir: bool) -> &mut Self {
self.make_dir = make_dir;
fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self {
self.make_dir_if_needed = make_dir_if_needed;
self
}
fn set_discard_if_corrupted(&mut self, _discard_if_corrupted: bool) -> &mut Self {
// Unfortunately, when opening a database, LMDB doesn't handle all the ways it could have
// been corrupted. Prefer using the `SafeMode` backend if this is important.
unimplemented!();
}
fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error> {
match self.env_path_type {
EnvironmentPathType::NoSubDir => {
@ -109,7 +115,7 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
},
EnvironmentPathType::SubDir => {
if !path.is_dir() {
if !self.make_dir {
if !self.make_dir_if_needed {
return Err(ErrorImpl::UnsuitableEnvironmentPath(path.into()));
}
fs::create_dir_all(path)?;

@ -55,7 +55,8 @@ pub struct EnvironmentBuilderImpl {
max_readers: Option<usize>,
max_dbs: Option<usize>,
map_size: Option<usize>,
make_dir: bool,
make_dir_if_needed: bool,
discard_if_corrupted: bool,
}
impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
@ -69,7 +70,8 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
max_readers: None,
max_dbs: None,
map_size: None,
make_dir: false,
make_dir_if_needed: false,
discard_if_corrupted: false,
}
}
@ -96,8 +98,13 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
self
}
fn set_make_dir_if_needed(&mut self, make_dir: bool) -> &mut Self {
self.make_dir = make_dir;
fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self {
self.make_dir_if_needed = make_dir_if_needed;
self
}
fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self {
self.discard_if_corrupted = discard_if_corrupted;
self
}
@ -105,13 +112,13 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
// Technically NO_SUB_DIR should change these checks here, but they're both currently
// unimplemented with this storage backend.
if !path.is_dir() {
if !self.make_dir {
if !self.make_dir_if_needed {
return Err(ErrorImpl::UnsuitableEnvironmentPath(path.into()));
}
fs::create_dir_all(path)?;
}
let mut env = EnvironmentImpl::new(path, self.flags, self.max_readers, self.max_dbs, self.map_size)?;
env.read_from_disk()?;
env.read_from_disk(self.discard_if_corrupted)?;
Ok(env)
}
}
@ -153,10 +160,13 @@ impl EnvironmentImpl {
Ok(bincode::serialize(&data)?)
}
fn deserialize(bytes: &[u8]) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> {
fn deserialize(bytes: &[u8], discard_if_corrupted: bool) -> Result<(DatabaseArena, DatabaseNameMap), ErrorImpl> {
let mut arena = DatabaseArena::new();
let mut name_map = HashMap::new();
let data: HashMap<_, _> = bincode::deserialize(&bytes)?;
let data: HashMap<_, _> = match bincode::deserialize(&bytes) {
Err(_) if discard_if_corrupted => Ok(HashMap::new()),
result => result,
}?;
for (name, db) in data {
name_map.insert(name, DatabaseImpl(arena.alloc(db)));
}
@ -194,7 +204,7 @@ impl EnvironmentImpl {
})
}
pub(crate) fn read_from_disk(&mut self) -> Result<(), ErrorImpl> {
pub(crate) fn read_from_disk(&mut self, discard_if_corrupted: bool) -> Result<(), ErrorImpl> {
let mut path = Cow::from(&self.path);
if fs::metadata(&path)?.is_dir() {
path.to_mut().push(DEFAULT_DB_FILENAME);
@ -202,7 +212,7 @@ impl EnvironmentImpl {
if fs::metadata(&path).is_err() {
return Ok(());
};
let (arena, name_map) = Self::deserialize(&fs::read(&path)?)?;
let (arena, name_map) = Self::deserialize(&fs::read(&path)?, discard_if_corrupted)?;
self.dbs = RwLock::new(EnvironmentDbs {
arena,
name_map,

@ -91,7 +91,9 @@ pub trait BackendEnvironmentBuilder<'b>: Debug + Eq + PartialEq + Copy + Clone {
fn set_map_size(&mut self, size: usize) -> &mut Self;
fn set_make_dir_if_needed(&mut self, make_dir: bool) -> &mut Self;
fn set_make_dir_if_needed(&mut self, make_dir_if_needed: bool) -> &mut Self;
fn set_discard_if_corrupted(&mut self, discard_if_corrupted: bool) -> &mut Self;
fn open(&self, path: &Path) -> Result<Self::Environment, Self::Error>;
}

@ -30,13 +30,17 @@ use crate::{
BackendRwCursorTransaction,
SafeModeError,
},
error::StoreError,
error::{
CloseError,
StoreError,
},
readwrite::{
Reader,
Writer,
},
store::{
single::SingleStore,
CloseOptions,
Options as StoreOptions,
},
};
@ -310,15 +314,16 @@ where
self.env.set_map_size(size).map_err(Into::into)
}
/// Closes this environment and deletes all its files from disk. Doesn't delete the
/// folder used when opening the environment.
pub fn close_and_delete(self) -> Result<(), StoreError> {
/// Closes this environment and optionally deletes all its files from disk. Doesn't
/// delete the folder used when opening the environment.
pub fn close(self, options: CloseOptions) -> Result<(), CloseError> {
let files = self.env.get_files_on_disk();
self.sync(true)?;
drop(self);
for file in files {
fs::remove_file(file)?;
if options.delete {
for file in files {
fs::remove_file(file)?;
}
}
Ok(())

@ -131,11 +131,41 @@ impl<T> From<sync::PoisonError<T>> for StoreError {
}
}
#[derive(Debug, Fail)]
pub enum CloseError {
#[fail(display = "manager poisoned")]
ManagerPoisonError,
#[fail(display = "close attempted while manager has an environment still open")]
EnvironmentStillOpen,
#[fail(display = "close attempted while an environment not known to the manager is still open")]
UnknownEnvironmentStillOpen,
#[fail(display = "I/O error: {:?}", _0)]
IoError(io::Error),
}
impl<T> From<sync::PoisonError<T>> for CloseError {
fn from(_: sync::PoisonError<T>) -> CloseError {
CloseError::ManagerPoisonError
}
}
impl From<io::Error> for CloseError {
fn from(e: io::Error) -> CloseError {
CloseError::IoError(e)
}
}
#[derive(Debug, Fail)]
pub enum MigrateError {
#[fail(display = "store error: {}", _0)]
StoreError(StoreError),
#[fail(display = "close error: {}", _0)]
CloseError(CloseError),
#[fail(display = "manager poisoned")]
ManagerPoisonError,
@ -152,6 +182,12 @@ impl From<StoreError> for MigrateError {
}
}
impl From<CloseError> for MigrateError {
fn from(e: CloseError) -> MigrateError {
MigrateError::CloseError(e)
}
}
impl<T> From<sync::PoisonError<T>> for MigrateError {
fn from(_: sync::PoisonError<T>) -> MigrateError {
MigrateError::ManagerPoisonError

@ -234,6 +234,7 @@ pub use readwrite::{
pub use store::{
keys::EncodableKey,
single::SingleStore,
CloseOptions,
Options as StoreOptions,
};
pub use value::{

@ -34,12 +34,17 @@ use crate::{
LmdbEnvironment,
SafeModeEnvironment,
},
error::StoreError,
error::{
CloseError,
StoreError,
},
helpers::canonicalize_path,
store::CloseOptions,
Rkv,
};
type Result<T> = result::Result<T, StoreError>;
type CloseResult<T> = result::Result<T, CloseError>;
type SharedRkv<E> = Arc<RwLock<Rkv<E>>>;
lazy_static! {
@ -146,10 +151,9 @@ where
})
}
/// Tries to close the specified environment and delete all its files from disk.
/// Doesn't delete the folder used when opening the environment.
/// This will only work if there's no other users of this environment.
pub fn try_close_and_delete<'p, P>(&mut self, path: P) -> Result<()>
/// Tries to close the specified environment.
/// Returns an error when other users of this environment still exist.
pub fn try_close<'p, P>(&mut self, path: P, options: CloseOptions) -> CloseResult<()>
where
P: Into<&'p Path>,
{
@ -159,16 +163,14 @@ where
canonicalize_path(path)?
};
match self.environments.entry(canonical) {
Entry::Vacant(_) => {}, // noop
Entry::Vacant(_) => Ok(()),
Entry::Occupied(e) if Arc::strong_count(e.get()) > 1 => Err(CloseError::EnvironmentStillOpen),
Entry::Occupied(e) => {
if Arc::strong_count(e.get()) == 1 {
if let Ok(env) = Arc::try_unwrap(e.remove()) {
env.into_inner()?.close_and_delete()?;
}
}
let env = Arc::try_unwrap(e.remove()).map_err(|_| CloseError::UnknownEnvironmentStillOpen)?;
env.into_inner()?.close(options)?;
Ok(())
},
}
Ok(())
}
}

@ -108,7 +108,10 @@ macro_rules! fn_migrator {
F: FnOnce(crate::backend::$builder) -> crate::backend::$builder,
D: std::ops::Deref<Target = Rkv<$dst_env>>,
{
use crate::backend::*;
use crate::{
backend::*,
CloseOptions,
};
let mut manager = crate::Manager::<$src_env>::singleton().write()?;
let mut builder = Rkv::<$src_env>::environment_builder::<$builder>();
@ -119,7 +122,7 @@ macro_rules! fn_migrator {
Migrator::$migrate(src_env.read()?, dst_env)?;
drop(src_env);
manager.try_close_and_delete(path)?;
manager.try_close(path, CloseOptions::delete_files_on_disk())?;
Ok(())
}
@ -138,10 +141,17 @@ macro_rules! fn_migrator {
D: std::ops::Deref<Target = Rkv<$dst_env>>,
{
match Migrator::$migrate(path, |builder| builder, dst_env) {
// Source environment is corrupted.
Err(crate::MigrateError::StoreError(crate::StoreError::FileInvalid)) => Ok(()),
// Path not accessible.
Err(crate::MigrateError::StoreError(crate::StoreError::IoError(_))) => Ok(()),
// Path accessible but incompatible for configuration.
Err(crate::MigrateError::StoreError(crate::StoreError::UnsuitableEnvironmentPath(_))) => Ok(()),
// Couldn't close source environment and delete files on disk (e.g. other stores still open).
Err(crate::MigrateError::CloseError(_)) => Ok(()),
// Nothing to migrate.
Err(crate::MigrateError::SourceEmpty) => Ok(()),
// Migrating would overwrite.
Err(crate::MigrateError::DestinationNotEmpty) => Ok(()),
result => result,
}?;

@ -39,3 +39,16 @@ where
}
}
}
#[derive(Default, Debug, Copy, Clone)]
pub struct CloseOptions {
pub delete: bool,
}
impl CloseOptions {
pub fn delete_files_on_disk() -> CloseOptions {
CloseOptions {
delete: true,
}
}
}

@ -17,12 +17,16 @@ use tempfile::Builder;
use rkv::{
backend::{
BackendEnvironmentBuilder,
Lmdb,
LmdbEnvironment,
SafeMode,
SafeModeEnvironment,
},
CloseOptions,
Rkv,
StoreOptions,
Value,
};
/// Test that a manager can be created with simple type inference.
@ -134,3 +138,120 @@ fn test_same_with_capacity_safe() {
let fetched_arc = manager.get(p).expect("success").expect("existed");
assert!(Arc::ptr_eq(&created_arc, &fetched_arc));
}
/// Some storage drivers are able to discard when the database is corrupted at runtime.
/// Test how these managers can discard corrupted databases and re-open.
#[test]
fn test_safe_mode_corrupt_while_open_1() {
type Manager = rkv::Manager<SafeModeEnvironment>;
let root = Builder::new().prefix("test_safe_mode_corrupt_while_open_1").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
// Create environment.
let mut manager = Manager::singleton().write().unwrap();
let shared_env = manager.get_or_create(root.path(), Rkv::new::<SafeMode>).expect("created");
let env = shared_env.read().unwrap();
// Write some data.
let store = env.open_single("store", StoreOptions::create()).expect("opened");
let mut writer = env.write().expect("writer");
store.put(&mut writer, "foo", &Value::I64(1234)).expect("wrote");
store.put(&mut writer, "bar", &Value::Bool(true)).expect("wrote");
store.put(&mut writer, "baz", &Value::Str("héllo, yöu")).expect("wrote");
writer.commit().expect("committed");
env.sync(true).expect("synced");
// Verify it was flushed to disk.
let mut safebin = root.path().to_path_buf();
safebin.push("data.safe.bin");
assert!(safebin.exists());
// Oops, corruption.
fs::write(&safebin, "bogus").expect("dbfile corrupted");
// Close everything.
drop(env);
drop(shared_env);
manager.try_close(root.path(), CloseOptions::default()).expect("closed without deleting");
assert!(manager.get(root.path()).expect("success").is_none());
// Recreating environment fails.
manager.get_or_create(root.path(), Rkv::new::<SafeMode>).expect_err("not created");
assert!(manager.get(root.path()).expect("success").is_none());
// But we can use a builder and pass `discard_if_corrupted` to deal with it.
let mut builder = Rkv::environment_builder::<SafeMode>();
builder.set_discard_if_corrupted(true);
manager.get_or_create_from_builder(root.path(), builder, Rkv::from_builder::<SafeMode>).expect("created");
assert!(manager.get(root.path()).expect("success").is_some());
}
/// Some storage drivers are able to recover when the database is corrupted at runtime.
/// Test how these managers can recover corrupted databases while open.
#[test]
fn test_safe_mode_corrupt_while_open_2() {
type Manager = rkv::Manager<SafeModeEnvironment>;
let root = Builder::new().prefix("test_safe_mode_corrupt_while_open_2").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
// Create environment.
let mut manager = Manager::singleton().write().unwrap();
let shared_env = manager.get_or_create(root.path(), Rkv::new::<SafeMode>).expect("created");
let env = shared_env.read().unwrap();
// Write some data.
let store = env.open_single("store", StoreOptions::create()).expect("opened");
let mut writer = env.write().expect("writer");
store.put(&mut writer, "foo", &Value::I64(1234)).expect("wrote");
store.put(&mut writer, "bar", &Value::Bool(true)).expect("wrote");
store.put(&mut writer, "baz", &Value::Str("héllo, yöu")).expect("wrote");
writer.commit().expect("committed");
env.sync(true).expect("synced");
// Verify it was flushed to disk.
let mut safebin = root.path().to_path_buf();
safebin.push("data.safe.bin");
assert!(safebin.exists());
// Oops, corruption.
fs::write(&safebin, "bogus").expect("dbfile corrupted");
// Reading still works. Magic.
let store = env.open_single("store", StoreOptions::default()).expect("opened");
let reader = env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(store.get(&reader, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(store.get(&reader, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
reader.abort();
// Writing still works, dbfile will be un-corrupted.
let store = env.open_single("store", StoreOptions::default()).expect("opened");
let mut writer = env.write().expect("writer");
store.put(&mut writer, "foo2", &Value::I64(5678)).expect("wrote");
store.put(&mut writer, "bar2", &Value::Bool(false)).expect("wrote");
store.put(&mut writer, "baz2", &Value::Str("byé, yöu")).expect("wrote");
writer.commit().expect("committed");
env.sync(true).expect("synced");
// Close everything.
drop(env);
drop(shared_env);
manager.try_close(root.path(), CloseOptions::default()).expect("closed without deleting");
assert!(manager.get(root.path()).expect("success").is_none());
// Recreate environment.
let shared_env = manager.get_or_create(root.path(), Rkv::new::<SafeMode>).expect("created");
let env = shared_env.read().unwrap();
// Verify that the dbfile is not corrupted.
let store = env.open_single("store", StoreOptions::default()).expect("opened");
let reader = env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(store.get(&reader, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(store.get(&reader, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
assert_eq!(store.get(&reader, "foo2").expect("read"), Some(Value::I64(5678)));
assert_eq!(store.get(&reader, "bar2").expect("read"), Some(Value::Bool(false)));
assert_eq!(store.get(&reader, "baz2").expect("read"), Some(Value::Str("byé, yöu")));
}

Loading…
Cancel
Save