From 44d1a5f04c95cde233fd26876d09e461a0e2f3b4 Mon Sep 17 00:00:00 2001 From: Tpt Date: Wed, 8 Dec 2021 08:06:37 +0100 Subject: [PATCH] Makes bulk load partial insertions atomic Ensures that the stores does not end up in an inconsistent state. --- Cargo.lock | 12 ++++----- lib/benches/store.rs | 18 +++++++------- lib/src/storage/backend/rocksdb.rs | 40 +++++++++++++++++++++--------- lib/src/storage/mod.rs | 4 +-- lib/src/store.rs | 22 ++++++++++------ lib/tests/store.rs | 4 +-- python/src/store.rs | 2 +- rocksdb-sys/api/c.cc | 16 ++++++++++++ rocksdb-sys/api/c.h | 11 ++++++++ server/src/main.rs | 2 +- 10 files changed, 90 insertions(+), 41 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d387e3ac..6c49b686 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -513,9 +513,9 @@ dependencies = [ [[package]] name = "itertools" -version = "0.10.1" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf" +checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3" dependencies = [ "either", ] @@ -958,9 +958,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" [[package]] name = "pkg-config" -version = "0.3.22" +version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12295df4f294471248581bc09bef3c38a5e46f1e36d6a37353621a0c6c357e1f" +checksum = "d1a3ea4f0dd7f1f3e512cf97bf100819aa547f36a6eccac8dbaae839eb92363e" [[package]] name = "plotters" @@ -1004,9 +1004,9 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" [[package]] name = "proc-macro2" -version = "1.0.32" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba508cc11742c0dc5c1659771673afbab7a0efab23aa17e854cbab0837ed0b43" +checksum = "fb37d2df5df740e582f28f8560cf425f52bb267d872fe58358eadb554909f07a" dependencies = [ "unicode-xid", ] diff --git a/lib/benches/store.rs b/lib/benches/store.rs index e82ace72..e062583b 100644 --- a/lib/benches/store.rs +++ b/lib/benches/store.rs @@ -36,8 +36,8 @@ fn store_load(c: &mut Criterion) { group.bench_function("load BSBM explore 1000 in on disk with bulk load", |b| { b.iter(|| { let path = TempDir::default(); - let mut store = Store::open(&path.0).unwrap(); - do_bulk_load(&mut store, &data); + let store = Store::open(&path.0).unwrap(); + do_bulk_load(&store, &data); }) }); } @@ -54,8 +54,8 @@ fn store_load(c: &mut Criterion) { group.bench_function("load BSBM explore 10000 in on disk with bulk load", |b| { b.iter(|| { let path = TempDir::default(); - let mut store = Store::open(&path.0).unwrap(); - do_bulk_load(&mut store, &data); + let store = Store::open(&path.0).unwrap(); + do_bulk_load(&store, &data); }) }); } @@ -73,7 +73,7 @@ fn do_load(store: &Store, data: &[u8]) { store.optimize().unwrap(); } -fn do_bulk_load(store: &mut Store, data: &[u8]) { +fn do_bulk_load(store: &Store, data: &[u8]) { store .bulk_load_graph( Cursor::new(&data), @@ -116,8 +116,8 @@ fn store_query_and_update(c: &mut Criterion) { group.sample_size(10); { - let mut memory_store = Store::new().unwrap(); - do_bulk_load(&mut memory_store, &data); + let memory_store = Store::new().unwrap(); + do_bulk_load(&memory_store, &data); group.bench_function("BSBM explore 1000 query in memory", |b| { b.iter(|| run_operation(&memory_store, &query_operations)) }); @@ -128,8 +128,8 @@ fn store_query_and_update(c: &mut Criterion) { { let path = TempDir::default(); - let mut disk_store = Store::open(&path.0).unwrap(); - do_bulk_load(&mut disk_store, &data); + let disk_store = Store::open(&path.0).unwrap(); + do_bulk_load(&disk_store, &data); group.bench_function("BSBM explore 1000 query on disk", |b| { b.iter(|| run_operation(&disk_store, &query_operations)) }); diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index f3c9d874..ed992b68 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -10,6 +10,7 @@ use libc::{self, c_char, c_void, free}; use oxrocksdb_sys::*; use rand::random; use std::borrow::Borrow; +use std::collections::HashMap; use std::env::temp_dir; use std::ffi::{CStr, CString}; use std::fs::remove_dir_all; @@ -438,25 +439,40 @@ impl Db { } } - pub fn insert_stt_files(&self, ssts_for_cf: Vec<(&ColumnFamily, PathBuf)>) -> Result<()> { + pub fn insert_stt_files(&self, ssts_for_cf: &[(&ColumnFamily, PathBuf)]) -> Result<()> { + let mut paths_by_cf = HashMap::<_, Vec<_>>::new(); for (cf, path) in ssts_for_cf { - unsafe { - ffi_result!(rocksdb_transactiondb_ingest_external_file_cf( - self.0.db, - cf.0, - &path_to_cstring(&path)?.as_ptr(), - 1, - self.0.ingest_external_file_options - ))?; - } + paths_by_cf + .entry(*cf) + .or_default() + .push(path_to_cstring(path)?); + } + let cpaths_by_cf = paths_by_cf + .iter() + .map(|(cf, paths)| (*cf, paths.iter().map(|p| p.as_ptr()).collect::>())) + .collect::>(); + let args = cpaths_by_cf + .iter() + .map(|(cf, p)| rocksdb_ingestexternalfilearg_t { + column_family: cf.0, + external_files: p.as_ptr(), + external_files_len: p.len(), + options: self.0.ingest_external_file_options, + }) + .collect::>(); + unsafe { + ffi_result!(rocksdb_transactiondb_ingest_external_files( + self.0.db, + args.as_ptr(), + args.len() + )) } - Ok(()) } } // It is fine to not keep a lifetime: there is no way to use this type without the database being still in scope. // So, no use after free possible. -#[derive(Clone)] +#[derive(Clone, Eq, PartialEq, Hash)] pub struct ColumnFamily(*mut rocksdb_column_family_handle_t); unsafe impl Send for ColumnFamily {} diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index a7e780ac..3b7df5b7 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -172,7 +172,7 @@ impl Storage { stt_file.insert_empty(&k)?; } self.db - .insert_stt_files(vec![(&self.graphs_cf, stt_file.finish()?)])?; + .insert_stt_files(&[(&self.graphs_cf, stt_file.finish()?)])?; version = 1; self.update_version(version)?; } @@ -1172,7 +1172,7 @@ impl BulkLoader { self.quads.clear(); } - self.storage.db.insert_stt_files(to_load) + self.storage.db.insert_stt_files(&to_load) } fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<()> { diff --git a/lib/src/store.rs b/lib/src/store.rs index cc5bafce..882069f6 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -612,7 +612,9 @@ impl Store { /// /// This function is optimized for large dataset loading speed. For small files, [`load_dataset`](Store::load_dataset) might be more convenient. /// - /// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store. + /// Warning: This method is not atomic. + /// If the parsing fails in the middle of the file, only a part of it may be written to the store. + /// Results might get weird if you delete data during the loading process. /// /// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files. /// @@ -622,7 +624,7 @@ impl Store { /// use oxigraph::io::DatasetFormat; /// use oxigraph::model::*; /// - /// let mut store = Store::new()?; + /// let store = Store::new()?; /// /// // insertion /// let file = b" ."; @@ -639,7 +641,7 @@ impl Store { /// Errors related to data loading into the store use the other error kinds. #[cfg(not(target_arch = "wasm32"))] pub fn bulk_load_dataset( - &mut self, + &self, reader: impl BufRead, format: DatasetFormat, base_iri: Option<&str>, @@ -657,7 +659,9 @@ impl Store { /// /// This function is optimized for large dataset loading speed. For small files, [`load_graph`](Store::load_graph) might be more convenient. /// - /// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store. + /// Warning: This method is not atomic. + /// If the parsing fails in the middle of the file, only a part of it may be written to the store. + /// Results might get weird if you delete data during the loading process. /// /// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files. /// @@ -667,7 +671,7 @@ impl Store { /// use oxigraph::io::GraphFormat; /// use oxigraph::model::*; /// - /// let mut store = Store::new()?; + /// let store = Store::new()?; /// /// // insertion /// let file = b" ."; @@ -684,7 +688,7 @@ impl Store { /// Errors related to data loading into the store use the other error kinds. #[cfg(not(target_arch = "wasm32"))] pub fn bulk_load_graph<'a>( - &mut self, + &self, reader: impl BufRead, format: GraphFormat, to_graph_name: impl Into>, @@ -707,11 +711,13 @@ impl Store { /// Adds a set of triples to this store using bulk load. /// - /// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store. + /// Warning: This method is not atomic. + /// If the process fails in the middle of the file, only a part of the data may be written to the store. + /// Results might get weird if you delete data during the loading process. /// /// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files. #[cfg(not(target_arch = "wasm32"))] - pub fn bulk_extend(&mut self, quads: impl IntoIterator) -> io::Result<()> { + pub fn bulk_extend(&self, quads: impl IntoIterator) -> io::Result<()> { bulk_load(&self.storage, quads.into_iter().map(Ok)) } } diff --git a/lib/tests/store.rs b/lib/tests/store.rs index 55c5fe6d..1376bf83 100644 --- a/lib/tests/store.rs +++ b/lib/tests/store.rs @@ -100,7 +100,7 @@ fn test_load_dataset() -> Result<()> { #[test] fn test_bulk_load_dataset() -> Result<()> { - let mut store = Store::new().unwrap(); + let store = Store::new().unwrap(); store.bulk_load_dataset(Cursor::new(DATA), DatasetFormat::TriG, None)?; for q in quads(GraphNameRef::DefaultGraph) { assert!(store.contains(q)?); @@ -183,7 +183,7 @@ fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<()> { NamedNodeRef::new_unchecked("http://example.com/o"), NamedNodeRef::new_unchecked("http://example.com/g"), ); - let mut store = Store::new()?; + let store = Store::new()?; store.remove(quad)?; store.bulk_extend([quad.into_owned()])?; assert_eq!(store.len()?, 1); diff --git a/python/src/store.rs b/python/src/store.rs index 7511a4aa..b9fd62c8 100644 --- a/python/src/store.rs +++ b/python/src/store.rs @@ -349,7 +349,7 @@ impl PyStore { #[pyo3(text_signature = "($self, data, /, mime_type, *, base_iri = None, to_graph = None)")] #[args(input, mime_type, "*", base_iri = "None", to_graph = "None")] fn bulk_load( - &mut self, + &self, input: PyObject, mime_type: &str, base_iri: Option<&str>, diff --git a/rocksdb-sys/api/c.cc b/rocksdb-sys/api/c.cc index e1e8443f..8c316201 100644 --- a/rocksdb-sys/api/c.cc +++ b/rocksdb-sys/api/c.cc @@ -53,6 +53,22 @@ void rocksdb_transactiondb_ingest_external_file_cf( SaveError(errptr, db->rep->IngestExternalFile(handle->rep, files, opt->rep)); } +void rocksdb_transactiondb_ingest_external_files( + rocksdb_transactiondb_t* db, const rocksdb_ingestexternalfilearg_t* list, + const size_t list_len, char** errptr) { + std::vector args(list_len); + for (size_t i = 0; i < list_len; ++i) { + args[i].column_family = list[i].column_family->rep; + std::vector files(list[i].external_files_len); + for (size_t j = 0; j < list[i].external_files_len; ++j) { + files[j] = std::string(list[i].external_files[j]); + } + args[i].external_files = files; + args[i].options = list[i].options->rep; + } + SaveError(errptr, db->rep->IngestExternalFiles(args)); +} + rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf( rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, rocksdb_column_family_handle_t* column_family, const char* key, diff --git a/rocksdb-sys/api/c.h b/rocksdb-sys/api/c.h index 74c492a7..7d38290f 100644 --- a/rocksdb-sys/api/c.h +++ b/rocksdb-sys/api/c.h @@ -6,6 +6,13 @@ extern "C" { #endif +typedef struct rocksdb_ingestexternalfilearg_t { + rocksdb_column_family_handle_t* column_family; + char const* const* external_files; + size_t external_files_len; + rocksdb_ingestexternalfileoptions_t* options; +} rocksdb_ingestexternalfilearg_t; + extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf( rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options, rocksdb_column_family_handle_t* column_family, const char* key, @@ -25,6 +32,10 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_ingest_external_file_cf( const char* const* file_list, const size_t list_len, const rocksdb_ingestexternalfileoptions_t* opt, char** errptr); +extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_ingest_external_files( + rocksdb_transactiondb_t* db, const rocksdb_ingestexternalfilearg_t* list, + const size_t list_len, char** errptr); + extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf( rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, rocksdb_column_family_handle_t* column_family, const char* key, diff --git a/server/src/main.rs b/server/src/main.rs index 38f34b64..71282ce0 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -69,7 +69,7 @@ pub fn main() -> std::io::Result<()> { ) .get_matches(); - let mut store = if let Some(path) = matches.value_of_os("location") { + let store = if let Some(path) = matches.value_of_os("location") { Store::open(path) } else { Store::new()