Make migration easier for consumers

Signed-off-by: Victor Porof <victor.porof@gmail.com>
without.crypto
Victor Porof 4 years ago
parent d1fef9a2c8
commit bada6fec6f
  1. 1
      Cargo.toml
  2. 62
      src/backend/impl_lmdb/environment.rs
  3. 6
      src/backend/impl_safe/environment.rs
  4. 7
      src/backend/traits.rs
  5. 15
      src/env.rs
  6. 19
      src/error.rs
  7. 1
      src/lib.rs
  8. 45
      src/manager.rs
  9. 61
      src/migrator.rs
  10. 148
      tests/env-migration.rs

@ -37,6 +37,7 @@ lazy_static = "1.0"
lmdb-rkv = "0.14"
log = "0.4"
ordered-float = "1.0"
paste = "0.1"
serde = {version = "1.0", features = ["derive", "rc"]}
serde_derive = "1.0"
url = "2.0"

@ -10,7 +10,10 @@
use std::{
fs,
path::Path,
path::{
Path,
PathBuf,
},
};
use lmdb::Error as LmdbError;
@ -103,18 +106,39 @@ impl<'b> BackendEnvironmentBuilder<'b> for EnvironmentBuilderImpl {
}
fs::create_dir_all(path).map_err(ErrorImpl::IoError)?;
}
self.builder.open(path).map(|env| EnvironmentImpl(env, self.envtype)).map_err(ErrorImpl::LmdbError)
self.builder
.open(path)
.map_err(ErrorImpl::LmdbError)
.and_then(|lmdbenv| EnvironmentImpl::new(path, lmdbenv, self.envtype))
}
}
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
enum EnvironmentType {
pub enum EnvironmentType {
SingleDatabase,
MultipleNamedDatabases,
}
#[derive(Debug)]
pub struct EnvironmentImpl(lmdb::Environment, EnvironmentType);
pub struct EnvironmentImpl {
path: PathBuf,
lmdbenv: lmdb::Environment,
envtype: EnvironmentType,
}
impl EnvironmentImpl {
pub(crate) fn new(
path: &Path,
lmdbenv: lmdb::Environment,
envtype: EnvironmentType,
) -> Result<EnvironmentImpl, ErrorImpl> {
Ok(EnvironmentImpl {
path: path.to_path_buf(),
lmdbenv,
envtype,
})
}
}
impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
type Database = DatabaseImpl;
@ -126,10 +150,10 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
type Stat = StatImpl;
fn get_dbs(&self) -> Result<Vec<Option<String>>, Self::Error> {
if self.1 == EnvironmentType::SingleDatabase {
if self.envtype == EnvironmentType::SingleDatabase {
return Ok(vec![None]);
}
let db = self.0.open_db(None).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)?;
let db = self.lmdbenv.open_db(None).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)?;
let reader = self.begin_ro_txn()?;
let cursor = reader.open_ro_cursor(&db)?;
let mut iter = cursor.into_iter();
@ -143,35 +167,35 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
}
fn open_db(&self, name: Option<&str>) -> Result<Self::Database, Self::Error> {
self.0.open_db(name).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.open_db(name).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)
}
fn create_db(&self, name: Option<&str>, flags: Self::Flags) -> Result<Self::Database, Self::Error> {
self.0.create_db(name, flags.0).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.create_db(name, flags.0).map(DatabaseImpl).map_err(ErrorImpl::LmdbError)
}
fn begin_ro_txn(&'e self) -> Result<Self::RoTransaction, Self::Error> {
self.0.begin_ro_txn().map(RoTransactionImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.begin_ro_txn().map(RoTransactionImpl).map_err(ErrorImpl::LmdbError)
}
fn begin_rw_txn(&'e self) -> Result<Self::RwTransaction, Self::Error> {
self.0.begin_rw_txn().map(RwTransactionImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.begin_rw_txn().map(RwTransactionImpl).map_err(ErrorImpl::LmdbError)
}
fn sync(&self, force: bool) -> Result<(), Self::Error> {
self.0.sync(force).map_err(ErrorImpl::LmdbError)
self.lmdbenv.sync(force).map_err(ErrorImpl::LmdbError)
}
fn stat(&self) -> Result<Self::Stat, Self::Error> {
self.0.stat().map(StatImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.stat().map(StatImpl).map_err(ErrorImpl::LmdbError)
}
fn info(&self) -> Result<Self::Info, Self::Error> {
self.0.info().map(InfoImpl).map_err(ErrorImpl::LmdbError)
self.lmdbenv.info().map(InfoImpl).map_err(ErrorImpl::LmdbError)
}
fn freelist(&self) -> Result<usize, Self::Error> {
self.0.freelist().map_err(ErrorImpl::LmdbError)
self.lmdbenv.freelist().map_err(ErrorImpl::LmdbError)
}
fn load_ratio(&self) -> Result<Option<f32>, Self::Error> {
@ -189,6 +213,14 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
}
fn set_map_size(&self, size: usize) -> Result<(), Self::Error> {
self.0.set_map_size(size).map_err(ErrorImpl::LmdbError)
self.lmdbenv.set_map_size(size).map_err(ErrorImpl::LmdbError)
}
fn get_files_on_disk(&self) -> Vec<PathBuf> {
let mut db_filename = self.path.clone();
let mut lock_filename = self.path.clone();
db_filename.push("data.mdb");
lock_filename.push("lock.mdb");
return vec![db_filename, lock_filename];
}
}

@ -286,4 +286,10 @@ impl<'e> BackendEnvironment<'e> for EnvironmentImpl {
warn!("`set_map_size({})` is ignored by this storage backend.", size);
Ok(())
}
fn get_files_on_disk(&self) -> Vec<PathBuf> {
let mut db_filename = self.path.clone();
db_filename.push(DEFAULT_DB_FILENAME);
return vec![db_filename];
}
}

@ -13,7 +13,10 @@ use std::{
Debug,
Display,
},
path::Path,
path::{
Path,
PathBuf,
},
};
use crate::{
@ -125,6 +128,8 @@ pub trait BackendEnvironment<'e>: Debug {
fn load_ratio(&self) -> Result<Option<f32>, Self::Error>;
fn set_map_size(&self, size: usize) -> Result<(), Self::Error>;
fn get_files_on_disk(&self) -> Vec<PathBuf>;
}
pub trait BackendRoTransaction: Debug {

@ -9,6 +9,7 @@
// specific language governing permissions and limitations under the License.
use std::{
fs,
os::raw::c_uint,
path::{
Path,
@ -308,4 +309,18 @@ where
pub fn set_map_size(&self, size: usize) -> Result<(), StoreError> {
self.env.set_map_size(size).map_err(Into::into)
}
/// Closes this environment and deletes all its files from disk. Doesn't delete the
/// folder used when opening the environment.
pub fn close_and_delete(self) -> Result<(), StoreError> {
let files = self.env.get_files_on_disk();
self.sync(true)?;
drop(self);
for file in files {
fs::remove_file(file)?;
}
Ok(())
}
}

@ -12,6 +12,7 @@ use std::{
io,
path::PathBuf,
str,
sync,
thread,
thread::ThreadId,
};
@ -56,6 +57,9 @@ impl From<Box<bincode::ErrorKind>> for DataError {
#[derive(Debug, Fail)]
pub enum StoreError {
#[fail(display = "manager poisoned")]
ManagerPoisonError,
#[fail(display = "database corrupted")]
DatabaseCorrupted,
@ -124,11 +128,20 @@ impl From<io::Error> for StoreError {
}
}
impl<T> From<sync::PoisonError<T>> for StoreError {
fn from(_: sync::PoisonError<T>) -> StoreError {
StoreError::ManagerPoisonError
}
}
#[derive(Debug, Fail)]
pub enum MigrateError {
#[fail(display = "store error: {}", _0)]
StoreError(StoreError),
#[fail(display = "manager poisoned")]
ManagerPoisonError,
#[fail(display = "source is empty")]
SourceEmpty,
@ -141,3 +154,9 @@ impl From<StoreError> for MigrateError {
MigrateError::StoreError(e)
}
}
impl<T> From<sync::PoisonError<T>> for MigrateError {
fn from(_: sync::PoisonError<T>) -> MigrateError {
MigrateError::ManagerPoisonError
}
}

@ -225,6 +225,7 @@ pub use error::{
StoreError,
};
pub use manager::Manager;
pub use migrator::Migrator;
pub use readwrite::{
Readable,
Reader,

@ -29,6 +29,8 @@ use lazy_static::lazy_static;
use crate::{
backend::{
BackendEnvironment,
BackendEnvironmentBuilder,
LmdbEnvironment,
SafeModeEnvironment,
},
@ -51,7 +53,10 @@ pub struct Manager<E> {
environments: BTreeMap<PathBuf, SharedRkv<E>>,
}
impl<E> Manager<E> {
impl<'e, E> Manager<E>
where
E: BackendEnvironment<'e>,
{
fn new() -> Manager<E> {
Manager {
environments: Default::default(),
@ -98,6 +103,44 @@ impl<E> Manager<E> {
},
})
}
/// Return a new Rkv environment from the builder, or create it by calling `f`.
pub fn get_or_create_from_builder<'p, F, P, B>(&mut self, path: P, builder: B, f: F) -> Result<SharedRkv<E>>
where
F: FnOnce(&Path, B) -> Result<Rkv<E>>,
P: Into<&'p Path>,
B: BackendEnvironmentBuilder<'e, Environment = E>,
{
let canonical = canonicalize_path(path)?;
Ok(match self.environments.entry(canonical) {
Entry::Occupied(e) => e.get().clone(),
Entry::Vacant(e) => {
let k = Arc::new(RwLock::new(f(e.key().as_path(), builder)?));
e.insert(k).clone()
},
})
}
/// Tries to close the specified environment and delete all its files from disk.
/// Doesn't delete the folder used when opening the environment.
/// This will only work if there's no other users of this environment.
pub fn try_close_and_delete<'p, P>(&mut self, path: P) -> Result<()>
where
P: Into<&'p Path>,
{
let canonical = canonicalize_path(path)?;
match self.environments.entry(canonical) {
Entry::Vacant(_) => {}, // noop
Entry::Occupied(e) => {
if Arc::strong_count(e.get()) == 1 {
if let Ok(env) = Arc::try_unwrap(e.remove()) {
env.into_inner()?.close_and_delete()?;
}
}
},
}
Ok(())
}
}
impl Manager<LmdbEnvironment> {

@ -47,8 +47,9 @@ pub use crate::backend::{
};
// FIXME: should parametrize this instead.
macro_rules! fn_migrator {
($name:tt, $src:ty, $dst:ty) => {
($name:tt, $src_env:ty, $dst_env:ty) => {
/// Migrate all data in all of databases from the source environment to the destination
/// environment. This includes all key/value pairs in the main database that aren't
/// metadata about subdatabases and all key/value pairs in all subdatabases.
@ -57,7 +58,11 @@ macro_rules! fn_migrator {
/// the given environments.
///
/// The destination environment should be empty of data, otherwise an error is returned.
pub fn $name(src_env: &Rkv<$src>, dst_env: &Rkv<$dst>) -> Result<(), MigrateError> {
pub fn $name<S, D>(src_env: S, dst_env: D) -> Result<(), MigrateError>
where
S: std::ops::Deref<Target = Rkv<$src_env>>,
D: std::ops::Deref<Target = Rkv<$dst_env>>,
{
let src_dbs = src_env.get_dbs().unwrap();
if src_dbs.is_empty() {
return Err(MigrateError::SourceEmpty);
@ -80,12 +85,58 @@ macro_rules! fn_migrator {
Ok(())
}
};
($migrate:tt, $name:tt, $builder:tt, $src_env:ty, $dst_env:ty) => {
/// Same as the other migration methods, but automatically attempts to open the source
/// environment, ignores it if it doesn't exist or if it's empty, and finally attempts to
/// delete all of its supporting files.
pub fn $name<F, D>(path: &std::path::Path, build: F, dst_env: D) -> Result<(), MigrateError>
where
F: FnOnce(crate::backend::$builder) -> crate::backend::$builder,
D: std::ops::Deref<Target = Rkv<$dst_env>>,
{
use crate::backend::*;
let mut manager = crate::Manager::<$src_env>::singleton().write()?;
let mut builder = Rkv::<$src_env>::environment_builder::<$builder>();
builder.set_max_dbs(crate::env::DEFAULT_MAX_DBS);
builder.set_check_if_env_exists(true);
builder = build(builder);
let src_env = match manager.get_or_create_from_builder(path, builder, Rkv::from_builder::<$builder>) {
Err(crate::StoreError::EnvironmentDoesNotExistError(_)) => return Ok(()),
result => result,
}?;
match Migrator::$migrate(src_env.read()?, dst_env) {
Err(crate::MigrateError::SourceEmpty) => return Ok(()),
result => result,
}?;
drop(src_env);
manager.try_close_and_delete(path)?;
Ok(())
}
};
}
macro_rules! fns_migrator {
($src:tt, $dst:tt) => {
paste::item! {
fns_migrator!([<migrate_ $src _to_ $dst>], $src, $dst);
fns_migrator!([<migrate_ $dst _to_ $src>], $dst, $src);
}
};
($name:tt, $src:tt, $dst:tt) => {
paste::item! {
fn_migrator!($name, [<$src:camel Environment>], [<$dst:camel Environment>]);
fn_migrator!($name, [<auto_ $name>], [<$src:camel>], [<$src:camel Environment>], [<$dst:camel Environment>]);
}
};
}
pub struct Migrator;
impl Migrator {
fn_migrator!(migrate_lmdb_to_safe_mode, LmdbEnvironment, SafeModeEnvironment);
fn_migrator!(migrate_safe_mode_to_lmdb, SafeModeEnvironment, LmdbEnvironment);
fns_migrator!(lmdb, safe_mode);
}

@ -18,7 +18,7 @@ use rkv::{
Lmdb,
SafeMode,
},
migrator::Migrator,
Migrator,
Rkv,
StoreOptions,
Value,
@ -35,6 +35,152 @@ macro_rules! populate_store {
};
}
#[test]
fn test_simple_migrator_lmdb_to_safe() {
let root = Builder::new().prefix("test_simple_migrator_lmdb_to_safe").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
// Populate source environment and persist to disk.
{
let src_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
populate_store!(&src_env);
src_env.sync(true).expect("synced");
}
// Check if the files were written to disk.
{
let mut datamdb = root.path().to_path_buf();
let mut lockmdb = root.path().to_path_buf();
datamdb.push("data.mdb");
lockmdb.push("lock.mdb");
assert!(datamdb.exists());
assert!(lockmdb.exists());
}
// Verify that database was written to disk.
{
let src_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
let store = src_env.open_single("store", StoreOptions::default()).expect("opened");
let reader = src_env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(store.get(&reader, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(store.get(&reader, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
}
// Easy migrate.
{
let dst_env = Rkv::new::<SafeMode>(root.path()).expect("new succeeded");
Migrator::auto_migrate_lmdb_to_safe_mode(root.path(), |builder| builder, &dst_env).expect("migrated");
}
// Verify that the database was indeed migrated.
{
let dst_env = Rkv::new::<SafeMode>(root.path()).expect("new succeeded");
let store = dst_env.open_single("store", StoreOptions::default()).expect("opened");
let reader = dst_env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(store.get(&reader, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(store.get(&reader, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
}
// Check if the old files were deleted from disk.
{
let mut datamdb = root.path().to_path_buf();
let mut lockmdb = root.path().to_path_buf();
datamdb.push("data.mdb");
lockmdb.push("lock.mdb");
assert!(!datamdb.exists());
assert!(!lockmdb.exists());
}
}
#[test]
fn test_simple_migrator_safe_to_lmdb() {
let root = Builder::new().prefix("test_simple_migrator_safe_to_lmdb").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
// Populate source environment and persist to disk.
{
let src_env = Rkv::new::<SafeMode>(root.path()).expect("new succeeded");
populate_store!(&src_env);
src_env.sync(true).expect("synced");
}
// Check if the files were written to disk.
{
let mut safebin = root.path().to_path_buf();
safebin.push("data.safe.bin");
assert!(safebin.exists());
}
// Verify that database was written to disk.
{
let src_env = Rkv::new::<SafeMode>(root.path()).expect("new succeeded");
let store = src_env.open_single("store", StoreOptions::default()).expect("opened");
let reader = src_env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(store.get(&reader, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(store.get(&reader, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
}
// Easy migrate.
{
let dst_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
Migrator::auto_migrate_safe_mode_to_lmdb(root.path(), |builder| builder, &dst_env).expect("migrated");
}
// Verify that the database was indeed migrated.
{
let dst_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
let store = dst_env.open_single("store", StoreOptions::default()).expect("opened");
let reader = dst_env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(store.get(&reader, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(store.get(&reader, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
}
// Check if the old files were deleted from disk.
{
let mut safebin = root.path().to_path_buf();
safebin.push("data.safe.bin");
assert!(!safebin.exists());
}
}
#[test]
fn test_migrator_round_trip() {
let root = Builder::new().prefix("test_simple_migrator_lmdb_to_safe").tempdir().expect("tempdir");
fs::create_dir_all(root.path()).expect("dir created");
// Populate source environment and persist to disk.
{
let src_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
populate_store!(&src_env);
src_env.sync(true).expect("synced");
}
// Easy migrate.
{
let dst_env = Rkv::new::<SafeMode>(root.path()).expect("new succeeded");
Migrator::auto_migrate_lmdb_to_safe_mode(root.path(), |builder| builder, &dst_env).expect("migrated");
}
// Easy migrate back.
{
let dst_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
Migrator::auto_migrate_safe_mode_to_lmdb(root.path(), |builder| builder, &dst_env).expect("migrated");
}
// Verify that the database was indeed migrated twice.
{
let dst_env = Rkv::new::<Lmdb>(root.path()).expect("new succeeded");
let store = dst_env.open_single("store", StoreOptions::default()).expect("opened");
let reader = dst_env.read().expect("reader");
assert_eq!(store.get(&reader, "foo").expect("read"), Some(Value::I64(1234)));
assert_eq!(store.get(&reader, "bar").expect("read"), Some(Value::Bool(true)));
assert_eq!(store.get(&reader, "baz").expect("read"), Some(Value::Str("héllo, yöu")));
}
// Check if the right files are finally present on disk.
{
let mut datamdb = root.path().to_path_buf();
let mut lockmdb = root.path().to_path_buf();
let mut safebin = root.path().to_path_buf();
datamdb.push("data.mdb");
lockmdb.push("lock.mdb");
safebin.push("data.safe.bin");
assert!(datamdb.exists());
assert!(lockmdb.exists());
assert!(!safebin.exists());
}
}
#[test]
#[should_panic(expected = "new succeeded: EnvironmentDoesNotExistError")]
fn test_migrator_lmdb_to_safe_0() {

Loading…
Cancel
Save