Makes bulk load partial insertions atomic

Ensures that the stores does not end up in an inconsistent state.
pull/174/head
Tpt 3 years ago
parent ff56a8c5eb
commit 44d1a5f04c
  1. 12
      Cargo.lock
  2. 18
      lib/benches/store.rs
  3. 36
      lib/src/storage/backend/rocksdb.rs
  4. 4
      lib/src/storage/mod.rs
  5. 22
      lib/src/store.rs
  6. 4
      lib/tests/store.rs
  7. 2
      python/src/store.rs
  8. 16
      rocksdb-sys/api/c.cc
  9. 11
      rocksdb-sys/api/c.h
  10. 2
      server/src/main.rs

12
Cargo.lock generated

@ -513,9 +513,9 @@ dependencies = [
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.10.1" version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69ddb889f9d0d08a67338271fa9b62996bc788c7796a5c18cf057420aaed5eaf" checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3"
dependencies = [ dependencies = [
"either", "either",
] ]
@ -958,9 +958,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]] [[package]]
name = "pkg-config" name = "pkg-config"
version = "0.3.22" version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12295df4f294471248581bc09bef3c38a5e46f1e36d6a37353621a0c6c357e1f" checksum = "d1a3ea4f0dd7f1f3e512cf97bf100819aa547f36a6eccac8dbaae839eb92363e"
[[package]] [[package]]
name = "plotters" name = "plotters"
@ -1004,9 +1004,9 @@ checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.32" version = "1.0.33"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba508cc11742c0dc5c1659771673afbab7a0efab23aa17e854cbab0837ed0b43" checksum = "fb37d2df5df740e582f28f8560cf425f52bb267d872fe58358eadb554909f07a"
dependencies = [ dependencies = [
"unicode-xid", "unicode-xid",
] ]

@ -36,8 +36,8 @@ fn store_load(c: &mut Criterion) {
group.bench_function("load BSBM explore 1000 in on disk with bulk load", |b| { group.bench_function("load BSBM explore 1000 in on disk with bulk load", |b| {
b.iter(|| { b.iter(|| {
let path = TempDir::default(); let path = TempDir::default();
let mut store = Store::open(&path.0).unwrap(); let store = Store::open(&path.0).unwrap();
do_bulk_load(&mut store, &data); do_bulk_load(&store, &data);
}) })
}); });
} }
@ -54,8 +54,8 @@ fn store_load(c: &mut Criterion) {
group.bench_function("load BSBM explore 10000 in on disk with bulk load", |b| { group.bench_function("load BSBM explore 10000 in on disk with bulk load", |b| {
b.iter(|| { b.iter(|| {
let path = TempDir::default(); let path = TempDir::default();
let mut store = Store::open(&path.0).unwrap(); let store = Store::open(&path.0).unwrap();
do_bulk_load(&mut store, &data); do_bulk_load(&store, &data);
}) })
}); });
} }
@ -73,7 +73,7 @@ fn do_load(store: &Store, data: &[u8]) {
store.optimize().unwrap(); store.optimize().unwrap();
} }
fn do_bulk_load(store: &mut Store, data: &[u8]) { fn do_bulk_load(store: &Store, data: &[u8]) {
store store
.bulk_load_graph( .bulk_load_graph(
Cursor::new(&data), Cursor::new(&data),
@ -116,8 +116,8 @@ fn store_query_and_update(c: &mut Criterion) {
group.sample_size(10); group.sample_size(10);
{ {
let mut memory_store = Store::new().unwrap(); let memory_store = Store::new().unwrap();
do_bulk_load(&mut memory_store, &data); do_bulk_load(&memory_store, &data);
group.bench_function("BSBM explore 1000 query in memory", |b| { group.bench_function("BSBM explore 1000 query in memory", |b| {
b.iter(|| run_operation(&memory_store, &query_operations)) b.iter(|| run_operation(&memory_store, &query_operations))
}); });
@ -128,8 +128,8 @@ fn store_query_and_update(c: &mut Criterion) {
{ {
let path = TempDir::default(); let path = TempDir::default();
let mut disk_store = Store::open(&path.0).unwrap(); let disk_store = Store::open(&path.0).unwrap();
do_bulk_load(&mut disk_store, &data); do_bulk_load(&disk_store, &data);
group.bench_function("BSBM explore 1000 query on disk", |b| { group.bench_function("BSBM explore 1000 query on disk", |b| {
b.iter(|| run_operation(&disk_store, &query_operations)) b.iter(|| run_operation(&disk_store, &query_operations))
}); });

@ -10,6 +10,7 @@ use libc::{self, c_char, c_void, free};
use oxrocksdb_sys::*; use oxrocksdb_sys::*;
use rand::random; use rand::random;
use std::borrow::Borrow; use std::borrow::Borrow;
use std::collections::HashMap;
use std::env::temp_dir; use std::env::temp_dir;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::fs::remove_dir_all; use std::fs::remove_dir_all;
@ -438,25 +439,40 @@ impl Db {
} }
} }
pub fn insert_stt_files(&self, ssts_for_cf: Vec<(&ColumnFamily, PathBuf)>) -> Result<()> { pub fn insert_stt_files(&self, ssts_for_cf: &[(&ColumnFamily, PathBuf)]) -> Result<()> {
let mut paths_by_cf = HashMap::<_, Vec<_>>::new();
for (cf, path) in ssts_for_cf { for (cf, path) in ssts_for_cf {
paths_by_cf
.entry(*cf)
.or_default()
.push(path_to_cstring(path)?);
}
let cpaths_by_cf = paths_by_cf
.iter()
.map(|(cf, paths)| (*cf, paths.iter().map(|p| p.as_ptr()).collect::<Vec<_>>()))
.collect::<Vec<_>>();
let args = cpaths_by_cf
.iter()
.map(|(cf, p)| rocksdb_ingestexternalfilearg_t {
column_family: cf.0,
external_files: p.as_ptr(),
external_files_len: p.len(),
options: self.0.ingest_external_file_options,
})
.collect::<Vec<_>>();
unsafe { unsafe {
ffi_result!(rocksdb_transactiondb_ingest_external_file_cf( ffi_result!(rocksdb_transactiondb_ingest_external_files(
self.0.db, self.0.db,
cf.0, args.as_ptr(),
&path_to_cstring(&path)?.as_ptr(), args.len()
1, ))
self.0.ingest_external_file_options
))?;
}
} }
Ok(())
} }
} }
// It is fine to not keep a lifetime: there is no way to use this type without the database being still in scope. // It is fine to not keep a lifetime: there is no way to use this type without the database being still in scope.
// So, no use after free possible. // So, no use after free possible.
#[derive(Clone)] #[derive(Clone, Eq, PartialEq, Hash)]
pub struct ColumnFamily(*mut rocksdb_column_family_handle_t); pub struct ColumnFamily(*mut rocksdb_column_family_handle_t);
unsafe impl Send for ColumnFamily {} unsafe impl Send for ColumnFamily {}

@ -172,7 +172,7 @@ impl Storage {
stt_file.insert_empty(&k)?; stt_file.insert_empty(&k)?;
} }
self.db self.db
.insert_stt_files(vec![(&self.graphs_cf, stt_file.finish()?)])?; .insert_stt_files(&[(&self.graphs_cf, stt_file.finish()?)])?;
version = 1; version = 1;
self.update_version(version)?; self.update_version(version)?;
} }
@ -1172,7 +1172,7 @@ impl BulkLoader {
self.quads.clear(); self.quads.clear();
} }
self.storage.db.insert_stt_files(to_load) self.storage.db.insert_stt_files(&to_load)
} }
fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<()> { fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) -> Result<()> {

@ -612,7 +612,9 @@ impl Store {
/// ///
/// This function is optimized for large dataset loading speed. For small files, [`load_dataset`](Store::load_dataset) might be more convenient. /// This function is optimized for large dataset loading speed. For small files, [`load_dataset`](Store::load_dataset) might be more convenient.
/// ///
/// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store. /// Warning: This method is not atomic.
/// If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// Results might get weird if you delete data during the loading process.
/// ///
/// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files. /// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files.
/// ///
@ -622,7 +624,7 @@ impl Store {
/// use oxigraph::io::DatasetFormat; /// use oxigraph::io::DatasetFormat;
/// use oxigraph::model::*; /// use oxigraph::model::*;
/// ///
/// let mut store = Store::new()?; /// let store = Store::new()?;
/// ///
/// // insertion /// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> ."; /// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
@ -639,7 +641,7 @@ impl Store {
/// Errors related to data loading into the store use the other error kinds. /// Errors related to data loading into the store use the other error kinds.
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn bulk_load_dataset( pub fn bulk_load_dataset(
&mut self, &self,
reader: impl BufRead, reader: impl BufRead,
format: DatasetFormat, format: DatasetFormat,
base_iri: Option<&str>, base_iri: Option<&str>,
@ -657,7 +659,9 @@ impl Store {
/// ///
/// This function is optimized for large dataset loading speed. For small files, [`load_graph`](Store::load_graph) might be more convenient. /// This function is optimized for large dataset loading speed. For small files, [`load_graph`](Store::load_graph) might be more convenient.
/// ///
/// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store. /// Warning: This method is not atomic.
/// If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// Results might get weird if you delete data during the loading process.
/// ///
/// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files. /// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files.
/// ///
@ -667,7 +671,7 @@ impl Store {
/// use oxigraph::io::GraphFormat; /// use oxigraph::io::GraphFormat;
/// use oxigraph::model::*; /// use oxigraph::model::*;
/// ///
/// let mut store = Store::new()?; /// let store = Store::new()?;
/// ///
/// // insertion /// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> ."; /// let file = b"<http://example.com> <http://example.com> <http://example.com> .";
@ -684,7 +688,7 @@ impl Store {
/// Errors related to data loading into the store use the other error kinds. /// Errors related to data loading into the store use the other error kinds.
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn bulk_load_graph<'a>( pub fn bulk_load_graph<'a>(
&mut self, &self,
reader: impl BufRead, reader: impl BufRead,
format: GraphFormat, format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'a>>, to_graph_name: impl Into<GraphNameRef<'a>>,
@ -707,11 +711,13 @@ impl Store {
/// Adds a set of triples to this store using bulk load. /// Adds a set of triples to this store using bulk load.
/// ///
/// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store. /// Warning: This method is not atomic.
/// If the process fails in the middle of the file, only a part of the data may be written to the store.
/// Results might get weird if you delete data during the loading process.
/// ///
/// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files. /// Warning: This method is optimized for speed. It uses multiple threads and multiple GBs of RAM on large files.
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn bulk_extend(&mut self, quads: impl IntoIterator<Item = Quad>) -> io::Result<()> { pub fn bulk_extend(&self, quads: impl IntoIterator<Item = Quad>) -> io::Result<()> {
bulk_load(&self.storage, quads.into_iter().map(Ok)) bulk_load(&self.storage, quads.into_iter().map(Ok))
} }
} }

@ -100,7 +100,7 @@ fn test_load_dataset() -> Result<()> {
#[test] #[test]
fn test_bulk_load_dataset() -> Result<()> { fn test_bulk_load_dataset() -> Result<()> {
let mut store = Store::new().unwrap(); let store = Store::new().unwrap();
store.bulk_load_dataset(Cursor::new(DATA), DatasetFormat::TriG, None)?; store.bulk_load_dataset(Cursor::new(DATA), DatasetFormat::TriG, None)?;
for q in quads(GraphNameRef::DefaultGraph) { for q in quads(GraphNameRef::DefaultGraph) {
assert!(store.contains(q)?); assert!(store.contains(q)?);
@ -183,7 +183,7 @@ fn test_bulk_load_on_existing_delete_overrides_the_delete() -> Result<()> {
NamedNodeRef::new_unchecked("http://example.com/o"), NamedNodeRef::new_unchecked("http://example.com/o"),
NamedNodeRef::new_unchecked("http://example.com/g"), NamedNodeRef::new_unchecked("http://example.com/g"),
); );
let mut store = Store::new()?; let store = Store::new()?;
store.remove(quad)?; store.remove(quad)?;
store.bulk_extend([quad.into_owned()])?; store.bulk_extend([quad.into_owned()])?;
assert_eq!(store.len()?, 1); assert_eq!(store.len()?, 1);

@ -349,7 +349,7 @@ impl PyStore {
#[pyo3(text_signature = "($self, data, /, mime_type, *, base_iri = None, to_graph = None)")] #[pyo3(text_signature = "($self, data, /, mime_type, *, base_iri = None, to_graph = None)")]
#[args(input, mime_type, "*", base_iri = "None", to_graph = "None")] #[args(input, mime_type, "*", base_iri = "None", to_graph = "None")]
fn bulk_load( fn bulk_load(
&mut self, &self,
input: PyObject, input: PyObject,
mime_type: &str, mime_type: &str,
base_iri: Option<&str>, base_iri: Option<&str>,

@ -53,6 +53,22 @@ void rocksdb_transactiondb_ingest_external_file_cf(
SaveError(errptr, db->rep->IngestExternalFile(handle->rep, files, opt->rep)); SaveError(errptr, db->rep->IngestExternalFile(handle->rep, files, opt->rep));
} }
void rocksdb_transactiondb_ingest_external_files(
rocksdb_transactiondb_t* db, const rocksdb_ingestexternalfilearg_t* list,
const size_t list_len, char** errptr) {
std::vector<rocksdb::IngestExternalFileArg> args(list_len);
for (size_t i = 0; i < list_len; ++i) {
args[i].column_family = list[i].column_family->rep;
std::vector<std::string> files(list[i].external_files_len);
for (size_t j = 0; j < list[i].external_files_len; ++j) {
files[j] = std::string(list[i].external_files[j]);
}
args[i].external_files = files;
args[i].options = list[i].options->rep;
}
SaveError(errptr, db->rep->IngestExternalFiles(args));
}
rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf( rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,

@ -6,6 +6,13 @@
extern "C" { extern "C" {
#endif #endif
typedef struct rocksdb_ingestexternalfilearg_t {
rocksdb_column_family_handle_t* column_family;
char const* const* external_files;
size_t external_files_len;
rocksdb_ingestexternalfileoptions_t* options;
} rocksdb_ingestexternalfilearg_t;
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf( extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transactiondb_get_pinned_cf(
rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options, rocksdb_transactiondb_t* db, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,
@ -25,6 +32,10 @@ extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_ingest_external_file_cf(
const char* const* file_list, const size_t list_len, const char* const* file_list, const size_t list_len,
const rocksdb_ingestexternalfileoptions_t* opt, char** errptr); const rocksdb_ingestexternalfileoptions_t* opt, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_transactiondb_ingest_external_files(
rocksdb_transactiondb_t* db, const rocksdb_ingestexternalfilearg_t* list,
const size_t list_len, char** errptr);
extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf( extern ROCKSDB_LIBRARY_API rocksdb_pinnableslice_t* rocksdb_transaction_get_pinned_cf(
rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options, rocksdb_transaction_t* txn, const rocksdb_readoptions_t* options,
rocksdb_column_family_handle_t* column_family, const char* key, rocksdb_column_family_handle_t* column_family, const char* key,

@ -69,7 +69,7 @@ pub fn main() -> std::io::Result<()> {
) )
.get_matches(); .get_matches();
let mut store = if let Some(path) = matches.value_of_os("location") { let store = if let Some(path) = matches.value_of_os("location") {
Store::open(path) Store::open(path)
} else { } else {
Store::new() Store::new()

Loading…
Cancel
Save