diff --git a/lib/benches/store.rs b/lib/benches/store.rs index b6a30981..cf8d075b 100644 --- a/lib/benches/store.rs +++ b/lib/benches/store.rs @@ -11,34 +11,64 @@ use std::io::{BufRead, BufReader, Cursor, Read}; use std::path::{Path, PathBuf}; fn store_load(c: &mut Criterion) { - let mut data = Vec::new(); - read_data("explore-1000.nt.zst") - .read_to_end(&mut data) - .unwrap(); + { + let mut data = Vec::new(); + read_data("explore-1000.nt.zst") + .read_to_end(&mut data) + .unwrap(); + + let mut group = c.benchmark_group("store load"); + group.throughput(Throughput::Bytes(data.len() as u64)); + group.sample_size(10); + group.bench_function("load BSBM explore 1000 in memory", |b| { + b.iter(|| { + let store = Store::new().unwrap(); + do_load(&store, &data); + }) + }); + group.bench_function("load BSBM explore 1000 in on disk", |b| { + b.iter(|| { + let path = TempDir::default(); + let store = Store::open(&path.0).unwrap(); + do_load(&store, &data); + }) + }); + group.bench_function("load BSBM explore 1000 in on disk with bulk load", |b| { + b.iter(|| { + let path = TempDir::default(); + Store::create_from_dataset( + &path.0, + Cursor::new(&data), + DatasetFormat::NQuads, + None, + ) + .unwrap(); + }) + }); + } - let mut group = c.benchmark_group("store load"); - group.throughput(Throughput::Bytes(data.len() as u64)); - group.sample_size(10); - group.bench_function("load BSBM explore 1000 in memory", |b| { - b.iter(|| { - let store = Store::new().unwrap(); - do_load(&store, &data); - }) - }); - group.bench_function("load BSBM explore 1000 in on disk", |b| { - b.iter(|| { - let path = TempDir::default(); - let store = Store::open(&path.0).unwrap(); - do_load(&store, &data); - }) - }); - group.bench_function("load BSBM explore 1000 in on disk with bulk load", |b| { - b.iter(|| { - let path = TempDir::default(); - Store::create_from_dataset(&path.0, Cursor::new(&data), DatasetFormat::NQuads, None) + { + let mut data = Vec::new(); + read_data("explore-10000.nt.zst") + .read_to_end(&mut data) + .unwrap(); + + let mut group = c.benchmark_group("store load large"); + group.throughput(Throughput::Bytes(data.len() as u64)); + group.sample_size(10); + group.bench_function("load BSBM explore 10000 in on disk with bulk load", |b| { + b.iter(|| { + let path = TempDir::default(); + Store::create_from_dataset( + &path.0, + Cursor::new(&data), + DatasetFormat::NQuads, + None, + ) .unwrap(); - }) - }); + }) + }); + } } fn do_load(store: &Store, data: &[u8]) { diff --git a/lib/src/storage/backend/mod.rs b/lib/src/storage/backend/mod.rs index 5e275c9e..b05bf2e3 100644 --- a/lib/src/storage/backend/mod.rs +++ b/lib/src/storage/backend/mod.rs @@ -7,7 +7,8 @@ pub use fallback::{ }; #[cfg(not(target_arch = "wasm32"))] pub use rocksdb::{ - ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, WriteBatchWithIndex, + ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, SstFileWriter, + WriteBatchWithIndex, }; use std::ffi::CString; diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index 92537bca..86db92b7 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -8,13 +8,14 @@ use crate::error::invalid_input_error; use crate::storage::backend::{CompactionAction, CompactionFilter}; use libc::{self, c_char, c_int, c_uchar, c_void, free, size_t}; use oxrocksdb_sys::*; +use rand::random; use std::borrow::Borrow; use std::env::temp_dir; use std::ffi::{CStr, CString}; use std::io::{Error, ErrorKind, Result}; use std::iter::Zip; use std::ops::Deref; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::{ptr, slice}; @@ -60,6 +61,8 @@ struct DbHandler { read_options: *mut rocksdb_readoptions_t, write_options: *mut rocksdb_writeoptions_t, flush_options: *mut rocksdb_flushoptions_t, + env_options: *mut rocksdb_envoptions_t, + ingest_external_file_options: *mut rocksdb_ingestexternalfileoptions_t, compaction_options: *mut rocksdb_compactoptions_t, block_based_table_options: *mut rocksdb_block_based_table_options_t, env: Option<*mut rocksdb_env_t>, @@ -67,6 +70,7 @@ struct DbHandler { cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_options: Vec<*mut rocksdb_options_t>, cf_compaction_filters: Vec<*mut rocksdb_compactionfilter_t>, + path: PathBuf, } impl Drop for DbHandler { @@ -82,6 +86,8 @@ impl Drop for DbHandler { rocksdb_readoptions_destroy(self.read_options); rocksdb_writeoptions_destroy(self.write_options); rocksdb_flushoptions_destroy(self.flush_options); + rocksdb_envoptions_destroy(self.env_options); + rocksdb_ingestexternalfileoptions_destroy(self.ingest_external_file_options); rocksdb_compactoptions_destroy(self.compaction_options); rocksdb_options_destroy(self.options); rocksdb_block_based_options_destroy(self.block_based_table_options); @@ -159,6 +165,7 @@ impl Db { ); if for_bulk_load { rocksdb_options_prepare_for_bulk_load(options); + rocksdb_options_set_error_if_exists(options, 1); } let block_based_table_options = rocksdb_block_based_options_create(); assert!( @@ -292,6 +299,18 @@ impl Db { "rocksdb_flushoptions_create returned null" ); + let env_options = rocksdb_envoptions_create(); + assert!( + !env_options.is_null(), + "rocksdb_envoptions_create returned null" + ); + + let ingest_external_file_options = rocksdb_ingestexternalfileoptions_create(); + assert!( + !ingest_external_file_options.is_null(), + "rocksdb_ingestexternalfileoptions_create returned null" + ); + let compaction_options = rocksdb_compactoptions_create(); assert!( !compaction_options.is_null(), @@ -304,6 +323,8 @@ impl Db { read_options, write_options, flush_options, + env_options, + ingest_external_file_options, compaction_options, block_based_table_options, env, @@ -311,6 +332,7 @@ impl Db { cf_handles, cf_options, cf_compaction_filters, + path: path.to_path_buf(), }) } } @@ -464,6 +486,41 @@ impl Db { iter.status()?; // We makes sure there is no read problem Ok(!iter.is_valid()) } + + pub fn new_sst_file(&self) -> Result { + unsafe { + let writer = rocksdb_sstfilewriter_create(self.0.env_options, self.0.options); + let cpath = CString::new( + self.0 + .path + .join(random::().to_string()) + .to_string_lossy() + .to_string(), + ) + .map_err(invalid_input_error)?; + ffi_result!(rocksdb_sstfilewriter_open(writer, cpath.as_ptr()))?; + Ok(SstFileWriter { writer, cpath }) + } + } + + pub fn write_stt_files( + &self, + writers_with_cf: Vec<(&ColumnFamily, SstFileWriter)>, + ) -> Result<()> { + for (cf, writer) in writers_with_cf { + unsafe { + ffi_result!(rocksdb_sstfilewriter_finish(writer.writer))?; + ffi_result!(rocksdb_ingest_external_file_cf( + self.0.db, + cf.0, + &writer.cpath.as_ptr(), + 1, + self.0.ingest_external_file_options + ))?; + } + } + Ok(()) + } } // It is fine to not keep a lifetime: there is no way to use this type without the database being still in scope. @@ -692,6 +749,49 @@ impl Iter { } } +pub struct SstFileWriter { + writer: *mut rocksdb_sstfilewriter_t, + cpath: CString, +} + +impl Drop for SstFileWriter { + fn drop(&mut self) { + unsafe { + rocksdb_sstfilewriter_destroy(self.writer); + } + } +} + +impl SstFileWriter { + pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + unsafe { + ffi_result!(rocksdb_sstfilewriter_put( + self.writer, + key.as_ptr() as *const c_char, + key.len(), + value.as_ptr() as *const c_char, + value.len(), + )) + } + } + + pub fn insert_empty(&mut self, key: &[u8]) -> Result<()> { + self.insert(key, &[]) + } + + pub fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + unsafe { + ffi_result!(rocksdb_sstfilewriter_merge( + self.writer, + key.as_ptr() as *const c_char, + key.len(), + value.as_ptr() as *const c_char, + value.len(), + )) + } + } +} + fn convert_error(ptr: *const c_char) -> Error { let message = unsafe { let s = CStr::from_ptr(ptr).to_string_lossy().into_owned(); diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 6fe575a4..133ec17e 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -1,5 +1,5 @@ use crate::error::invalid_data_error; -use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef, TermRef}; +use crate::model::{GraphNameRef, NamedOrBlankNodeRef, Quad, QuadRef, TermRef}; use crate::storage::binary_encoder::{ decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple, write_gosp_quad, write_gpos_quad, write_gspo_quad, write_osp_quad, write_ospg_quad, @@ -9,13 +9,19 @@ use crate::storage::binary_encoder::{ use crate::storage::numeric_encoder::{ insert_term, remove_term, EncodedQuad, EncodedTerm, StrHash, StrLookup, }; +#[cfg(not(target_arch = "wasm32"))] +use backend::SstFileWriter; use backend::{ ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter, MergeOperator, WriteBatchWithIndex, }; +#[cfg(not(target_arch = "wasm32"))] +use std::collections::{hash_map, HashMap, HashSet}; use std::ffi::CString; use std::io::Result; #[cfg(not(target_arch = "wasm32"))] +use std::mem::take; +#[cfg(not(target_arch = "wasm32"))] use std::path::Path; mod backend; @@ -1033,6 +1039,218 @@ impl StorageWriter { } } +/// Creates a database from a dataset files. +#[cfg(not(target_arch = "wasm32"))] +pub struct BulkLoader { + storage: Storage, + id2str: HashMap)>, + quads: HashSet, + triples: HashSet, + graphs: HashSet, +} + +#[cfg(not(target_arch = "wasm32"))] +impl BulkLoader { + pub fn new(path: &Path) -> Result { + Ok(Self { + storage: Storage::open(path, true)?, //TODO: remove bulk option + id2str: HashMap::default(), + quads: HashSet::default(), + triples: HashSet::default(), + graphs: HashSet::default(), + }) + } + + pub fn load(&mut self, quads: impl IntoIterator>) -> Result<()> { + let mut count = 0; + for quad in quads { + let quad = quad?; + let encoded = EncodedQuad::from(quad.as_ref()); + if quad.graph_name.is_default_graph() { + if self.triples.insert(encoded.clone()) { + self.insert_term(quad.subject.as_ref().into(), &encoded.subject); + self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate); + self.insert_term(quad.object.as_ref(), &encoded.object); + } + } else if self.quads.insert(encoded.clone()) { + self.insert_term(quad.subject.as_ref().into(), &encoded.subject); + self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate); + self.insert_term(quad.object.as_ref(), &encoded.object); + if self.graphs.insert(encoded.graph_name.clone()) { + self.insert_term( + match quad.graph_name.as_ref() { + GraphNameRef::NamedNode(n) => n.into(), + GraphNameRef::BlankNode(n) => n.into(), + GraphNameRef::DefaultGraph => unreachable!(), + }, + &encoded.graph_name, + ); + } + } + count += 1; + if count % (1024 * 1024) == 0 { + self.save()?; + } + } + self.save()?; + self.storage.compact() + } + + fn save(&mut self) -> Result<()> { + let mut to_load = Vec::new(); + + // id2str + if !self.id2str.is_empty() { + let mut id2str = take(&mut self.id2str) + .into_iter() + .map(|(k, v)| (k.to_be_bytes(), v)) + .collect::>(); + id2str.sort(); + let mut id2str_sst = self.storage.db.new_sst_file()?; + let mut buffer = Vec::new(); + for (k, (count, v)) in id2str { + buffer.extend_from_slice(&count.to_be_bytes()); + buffer.extend_from_slice(v.as_bytes()); + id2str_sst.merge(&k, &buffer)?; + buffer.clear(); + } + to_load.push((&self.storage.id2str_cf, id2str_sst)); + } + + if !self.triples.is_empty() { + to_load.push(( + &self.storage.dspo_cf, + self.add_keys_to_sst( + self.triples.iter().map(|quad| { + encode_term_triple(&quad.subject, &quad.predicate, &quad.object) + }), + )?, + )); + to_load.push(( + &self.storage.dpos_cf, + self.add_keys_to_sst( + self.triples.iter().map(|quad| { + encode_term_triple(&quad.predicate, &quad.object, &quad.subject) + }), + )?, + )); + to_load.push(( + &self.storage.dosp_cf, + self.add_keys_to_sst( + self.triples.iter().map(|quad| { + encode_term_triple(&quad.object, &quad.subject, &quad.predicate) + }), + )?, + )); + self.triples.clear(); + } + + if !self.quads.is_empty() { + let quads = take(&mut self.graphs); + to_load.push(( + &self.storage.graphs_cf, + self.add_keys_to_sst(quads.into_iter().map(|g| encode_term(&g)))?, + )); + + to_load.push(( + &self.storage.gspo_cf, + self.add_keys_to_sst(self.quads.iter().map(|quad| { + encode_term_quad( + &quad.graph_name, + &quad.subject, + &quad.predicate, + &quad.object, + ) + }))?, + )); + to_load.push(( + &self.storage.gpos_cf, + self.add_keys_to_sst(self.quads.iter().map(|quad| { + encode_term_quad( + &quad.graph_name, + &quad.object, + &quad.subject, + &quad.predicate, + ) + }))?, + )); + to_load.push(( + &self.storage.gosp_cf, + self.add_keys_to_sst(self.quads.iter().map(|quad| { + encode_term_quad( + &quad.graph_name, + &quad.object, + &quad.subject, + &quad.predicate, + ) + }))?, + )); + to_load.push(( + &self.storage.spog_cf, + self.add_keys_to_sst(self.quads.iter().map(|quad| { + encode_term_quad( + &quad.subject, + &quad.predicate, + &quad.object, + &quad.graph_name, + ) + }))?, + )); + to_load.push(( + &self.storage.posg_cf, + self.add_keys_to_sst(self.quads.iter().map(|quad| { + encode_term_quad( + &quad.object, + &quad.subject, + &quad.predicate, + &quad.graph_name, + ) + }))?, + )); + to_load.push(( + &self.storage.ospg_cf, + self.add_keys_to_sst(self.quads.iter().map(|quad| { + encode_term_quad( + &quad.object, + &quad.subject, + &quad.predicate, + &quad.graph_name, + ) + }))?, + )); + self.quads.clear(); + } + + self.storage.db.write_stt_files(to_load) + } + + fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) { + insert_term( + term, + encoded, + &mut |key, value| match self.id2str.entry(*key) { + hash_map::Entry::Occupied(mut e) => { + let e = e.get_mut(); + e.0 = e.0.wrapping_add(1); + } + hash_map::Entry::Vacant(e) => { + e.insert((1, value.into())); + } + }, + ) + } + + fn add_keys_to_sst(&self, values: impl Iterator>) -> Result { + let mut values = values.collect::>(); + values.sort_unstable(); + let mut sst = self.storage.db.new_sst_file()?; + for t in values { + sst.insert_empty(&t)?; + } + Ok(sst) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/lib/src/store.rs b/lib/src/store.rs index 986d068b..42cb5532 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -23,7 +23,8 @@ //! }; //! # Result::<_,Box>::Ok(()) //! ``` -use crate::io::{DatasetFormat, GraphFormat}; +use crate::error::invalid_input_error; +use crate::io::{DatasetFormat, DatasetParser, GraphFormat}; use crate::model::*; use crate::sparql::{ evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update, @@ -31,6 +32,8 @@ use crate::sparql::{ }; use crate::storage::io::{dump_dataset, dump_graph, load_dataset, load_graph}; use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm}; +#[cfg(not(target_arch = "wasm32"))] +use crate::storage::BulkLoader; use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage}; use std::io::{BufRead, Write}; #[cfg(not(target_arch = "wasm32"))] @@ -574,8 +577,9 @@ impl Store { /// Creates a store efficiently from a dataset file. /// - /// Warning: This functions is optimized for performances and saves the triples in a not atomic way. - /// If the parsing fails in the middle of the file, only a part of it may be written to the store. + /// Warning: This function is optimized for speed and might eat a lot of memory. + /// + /// Warning: If the parsing fails in the middle of the file, only a part of it may be written to the store. /// /// Usage example: /// ``` @@ -606,14 +610,13 @@ impl Store { format: DatasetFormat, base_iri: Option<&str>, ) -> io::Result<()> { - let storage = Storage::open(path, false)?; - { - let mut writer = storage.simple_writer(); - load_dataset(&mut writer, reader, format, base_iri)?; - writer.commit()?; + let mut parser = DatasetParser::from_format(format); + if let Some(base_iri) = base_iri { + parser = parser + .with_base_iri(base_iri) + .map_err(invalid_input_error)?; } - storage.flush()?; - storage.compact() + BulkLoader::new(path)?.load(parser.read_quads(reader)?) } } diff --git a/lib/tests/store.rs b/lib/tests/store.rs index b43217bb..e3086211 100644 --- a/lib/tests/store.rs +++ b/lib/tests/store.rs @@ -2,6 +2,9 @@ use oxigraph::io::{DatasetFormat, GraphFormat}; use oxigraph::model::vocab::{rdf, xsd}; use oxigraph::model::*; use oxigraph::store::Store; +use rand::random; +use std::env::temp_dir; +use std::fs::remove_dir_all; use std::io; use std::io::Cursor; use std::process::Command; @@ -99,6 +102,17 @@ fn test_load_dataset() -> io::Result<()> { Ok(()) } +#[test] +fn test_bulk_load_dataset() -> io::Result<()> { + let temp = temp_dir().join(random::().to_string()); + Store::create_from_dataset(&temp, Cursor::new(DATA), DatasetFormat::TriG, None)?; + let store = Store::open(&temp)?; + for q in quads(GraphNameRef::DefaultGraph) { + assert!(store.contains(q)?); + } + remove_dir_all(&temp) +} + #[test] fn test_dump_graph() -> io::Result<()> { let store = Store::new()?;