Adds a backup system

RocksDB provides a great cheap backup feature thanks to the immutable SST storage.

Sadly it is not compatible with in-memory databases which do not rely on the SST files.
pull/190/head
Tpt 3 years ago
parent 34cc602e0b
commit 203bd4c080
  1. 23
      lib/src/storage/backend/rocksdb.rs
  2. 5
      lib/src/storage/mod.rs
  3. 21
      lib/src/store.rs
  4. 38
      lib/tests/store.rs
  5. 19
      rocksdb-sys/api/c.cc
  6. 3
      rocksdb-sys/api/c.h

@ -101,7 +101,7 @@ struct DbHandler {
cf_handles: Vec<*mut rocksdb_column_family_handle_t>,
cf_options: Vec<*mut rocksdb_options_t>,
path: PathBuf,
remove_path: bool,
in_memory: bool,
}
impl Drop for DbHandler {
@ -125,7 +125,7 @@ impl Drop for DbHandler {
rocksdb_options_destroy(self.options);
rocksdb_block_based_options_destroy(self.block_based_table_options);
}
if self.remove_path && self.path.exists() {
if self.in_memory && self.path.exists() {
remove_dir_all(&self.path).unwrap();
}
}
@ -334,7 +334,7 @@ impl Db {
cf_handles,
cf_options,
path,
remove_path: in_memory,
in_memory,
})
}
}
@ -564,6 +564,23 @@ impl Db {
}
Ok(())
}
pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> {
if self.0.in_memory {
return Err(StorageError::Other(
"It is not possible to backup an in-memory database created with `Store::open`"
.into(),
));
}
let path = path_to_cstring(target_directory)?;
unsafe {
ffi_result!(rocksdb_transactiondb_create_checkpoint_with_status(
self.0.db,
path.as_ptr()
))?;
}
Ok(())
}
}
// It is fine to not keep a lifetime: there is no way to use this type without the database being still in scope.

@ -272,6 +272,11 @@ impl Storage {
self.db.compact(&self.dosp_cf)?;
self.db.compact(&self.id2str_cf)
}
#[cfg(not(target_arch = "wasm32"))]
pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> {
self.db.backup(target_directory)
}
}
pub struct StorageReader {

@ -733,6 +733,27 @@ impl Store {
self.storage.compact()
}
/// Creates database backup into the `target_directory`.
///
/// After its creation, the backup is usable using [`Store::open`]
/// like a regular Oxigraph database and operates independently from the original database.
///
/// Warning: Backups are only possible for on-disk databases created using [`Store::open`].
/// Temporary in-memory databases created using [`Store::new`] are not compatible with RocksDB backup system.
///
/// Warning: An error is raised if the `target_directory` already exists.
///
/// If the target directory is in the same file system as the current database,
/// the database content will not be fully copied
/// but hard links will be used to point to the original database immutable snapshots.
/// This allows cheap regular backups.
///
/// If you want to move your data to an other RDF storage system, you should have a look at the [`Store::dump_dataset`] function instead.
#[cfg(not(target_arch = "wasm32"))]
pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> {
self.storage.backup(target_directory)
}
/// Loads a dataset file efficiently into the store.
///
/// This function is optimized for large dataset loading speed. For small files, [`load_dataset`](Store::load_dataset) might be more convenient.

@ -227,6 +227,44 @@ fn test_bad_stt_open() -> Result<(), Box<dyn Error>> {
Ok(())
}
#[test]
fn test_backup() -> Result<(), Box<dyn Error>> {
let quad = QuadRef {
subject: NamedNodeRef::new_unchecked("http://example.com/s").into(),
predicate: NamedNodeRef::new_unchecked("http://example.com/p"),
object: NamedNodeRef::new_unchecked("http://example.com/o").into(),
graph_name: GraphNameRef::DefaultGraph,
};
let store_dir = TempDir::default();
let backup_dir = TempDir::default();
let store = Store::open(&store_dir.0)?;
store.insert(quad)?;
store.backup(&backup_dir.0)?;
store.remove(quad)?;
assert!(!store.contains(quad)?);
assert!(Store::open(&backup_dir.0)?.contains(quad)?);
Ok(())
}
#[test]
fn test_bad_backup() -> Result<(), Box<dyn Error>> {
let store_dir = TempDir::default();
let backup_dir = TempDir::default();
create_dir(&backup_dir.0)?;
assert!(Store::open(&store_dir.0)?.backup(&backup_dir.0).is_err());
Ok(())
}
#[test]
fn test_backup_on_in_memory() -> Result<(), Box<dyn Error>> {
let backup_dir = TempDir::default();
assert!(Store::new()?.backup(&backup_dir.0).is_err());
Ok(())
}
#[test]
#[cfg(target_os = "linux")]
fn test_backward_compatibility() -> Result<(), Box<dyn Error>> {

@ -55,7 +55,7 @@ rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf_with_status(
rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, rocksdb_status_t* statusptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
rocksdb_pinnableslice_t* v = new rocksdb_pinnableslice_t;
Status s = db->rep->Get(options->rep, column_family->rep, Slice(key, keylen),
&v->rep);
if (!s.ok()) {
@ -116,6 +116,19 @@ void rocksdb_transactiondb_ingest_external_files_with_status(
SaveStatus(statusptr, db->rep->IngestExternalFiles(args));
}
void rocksdb_transactiondb_create_checkpoint_with_status(
rocksdb_transactiondb_t* db, const char* checkpoint_dir,
rocksdb_status_t* statusptr) {
Checkpoint* checkpoint;
Status s = Checkpoint::Create(db->rep, &checkpoint);
if (!s.ok()) {
SaveStatus(statusptr, s);
return;
}
SaveStatus(statusptr, checkpoint->CreateCheckpoint(std::string(checkpoint_dir)));
delete checkpoint;
}
void rocksdb_transaction_commit_with_status(rocksdb_transaction_t* txn, rocksdb_status_t* statusptr) {
SaveStatus(statusptr, txn->rep->Commit());
@ -129,7 +142,7 @@ rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf_with_status(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, rocksdb_status_t* statusptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
rocksdb_pinnableslice_t* v = new rocksdb_pinnableslice_t;
Status s = txn->rep->Get(options->rep, column_family->rep, Slice(key, keylen),
&v->rep);
if (!s.ok()) {
@ -146,7 +159,7 @@ rocksdb_pinnableslice_t* rocksdb_transaction_get_for_update_pinned_cf_with_statu
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, rocksdb_status_t* statusptr) {
rocksdb_pinnableslice_t* v = new (rocksdb_pinnableslice_t);
rocksdb_pinnableslice_t* v = new rocksdb_pinnableslice_t;
Status s = txn->rep->GetForUpdate(options->rep, column_family->rep, Slice(key, keylen),
&v->rep);
if (!s.ok()) {

@ -96,6 +96,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_ingest_external_files_with
rocksdb_transactiondb_t* db, const rocksdb_ingestexternalfilearg_t* list,
const size_t list_len, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_create_checkpoint_with_status(
rocksdb_transactiondb_t* db, const char* checkpoint_dir, rocksdb_status_t* statusptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transaction_commit_with_status(
rocksdb_transaction_t* txn, rocksdb_status_t* statusptr);

Loading…
Cancel
Save