Adds bulk load by directly write RocksDB SSTs

pull/171/head
Tpt 3 years ago
parent 1d3108d27f
commit 196d6d6576
  1. 32
      lib/benches/store.rs
  2. 3
      lib/src/storage/backend/mod.rs
  3. 102
      lib/src/storage/backend/rocksdb.rs
  4. 220
      lib/src/storage/mod.rs
  5. 23
      lib/src/store.rs
  6. 14
      lib/tests/store.rs

@ -11,6 +11,7 @@ use std::io::{BufRead, BufReader, Cursor, Read};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
fn store_load(c: &mut Criterion) { fn store_load(c: &mut Criterion) {
{
let mut data = Vec::new(); let mut data = Vec::new();
read_data("explore-1000.nt.zst") read_data("explore-1000.nt.zst")
.read_to_end(&mut data) .read_to_end(&mut data)
@ -35,11 +36,40 @@ fn store_load(c: &mut Criterion) {
group.bench_function("load BSBM explore 1000 in on disk with bulk load", |b| { group.bench_function("load BSBM explore 1000 in on disk with bulk load", |b| {
b.iter(|| { b.iter(|| {
let path = TempDir::default(); let path = TempDir::default();
Store::create_from_dataset(&path.0, Cursor::new(&data), DatasetFormat::NQuads, None) Store::create_from_dataset(
&path.0,
Cursor::new(&data),
DatasetFormat::NQuads,
None,
)
.unwrap();
})
});
}
{
let mut data = Vec::new();
read_data("explore-10000.nt.zst")
.read_to_end(&mut data)
.unwrap();
let mut group = c.benchmark_group("store load large");
group.throughput(Throughput::Bytes(data.len() as u64));
group.sample_size(10);
group.bench_function("load BSBM explore 10000 in on disk with bulk load", |b| {
b.iter(|| {
let path = TempDir::default();
Store::create_from_dataset(
&path.0,
Cursor::new(&data),
DatasetFormat::NQuads,
None,
)
.unwrap(); .unwrap();
}) })
}); });
} }
}
fn do_load(store: &Store, data: &[u8]) { fn do_load(store: &Store, data: &[u8]) {
store store

@ -7,7 +7,8 @@ pub use fallback::{
}; };
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub use rocksdb::{ pub use rocksdb::{
ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, WriteBatchWithIndex, ColumnFamily, ColumnFamilyDefinition, Db, Iter, MergeOperator, SstFileWriter,
WriteBatchWithIndex,
}; };
use std::ffi::CString; use std::ffi::CString;

@ -8,13 +8,14 @@ use crate::error::invalid_input_error;
use crate::storage::backend::{CompactionAction, CompactionFilter}; use crate::storage::backend::{CompactionAction, CompactionFilter};
use libc::{self, c_char, c_int, c_uchar, c_void, free, size_t}; use libc::{self, c_char, c_int, c_uchar, c_void, free, size_t};
use oxrocksdb_sys::*; use oxrocksdb_sys::*;
use rand::random;
use std::borrow::Borrow; use std::borrow::Borrow;
use std::env::temp_dir; use std::env::temp_dir;
use std::ffi::{CStr, CString}; use std::ffi::{CStr, CString};
use std::io::{Error, ErrorKind, Result}; use std::io::{Error, ErrorKind, Result};
use std::iter::Zip; use std::iter::Zip;
use std::ops::Deref; use std::ops::Deref;
use std::path::Path; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::{ptr, slice}; use std::{ptr, slice};
@ -60,6 +61,8 @@ struct DbHandler {
read_options: *mut rocksdb_readoptions_t, read_options: *mut rocksdb_readoptions_t,
write_options: *mut rocksdb_writeoptions_t, write_options: *mut rocksdb_writeoptions_t,
flush_options: *mut rocksdb_flushoptions_t, flush_options: *mut rocksdb_flushoptions_t,
env_options: *mut rocksdb_envoptions_t,
ingest_external_file_options: *mut rocksdb_ingestexternalfileoptions_t,
compaction_options: *mut rocksdb_compactoptions_t, compaction_options: *mut rocksdb_compactoptions_t,
block_based_table_options: *mut rocksdb_block_based_table_options_t, block_based_table_options: *mut rocksdb_block_based_table_options_t,
env: Option<*mut rocksdb_env_t>, env: Option<*mut rocksdb_env_t>,
@ -67,6 +70,7 @@ struct DbHandler {
cf_handles: Vec<*mut rocksdb_column_family_handle_t>, cf_handles: Vec<*mut rocksdb_column_family_handle_t>,
cf_options: Vec<*mut rocksdb_options_t>, cf_options: Vec<*mut rocksdb_options_t>,
cf_compaction_filters: Vec<*mut rocksdb_compactionfilter_t>, cf_compaction_filters: Vec<*mut rocksdb_compactionfilter_t>,
path: PathBuf,
} }
impl Drop for DbHandler { impl Drop for DbHandler {
@ -82,6 +86,8 @@ impl Drop for DbHandler {
rocksdb_readoptions_destroy(self.read_options); rocksdb_readoptions_destroy(self.read_options);
rocksdb_writeoptions_destroy(self.write_options); rocksdb_writeoptions_destroy(self.write_options);
rocksdb_flushoptions_destroy(self.flush_options); rocksdb_flushoptions_destroy(self.flush_options);
rocksdb_envoptions_destroy(self.env_options);
rocksdb_ingestexternalfileoptions_destroy(self.ingest_external_file_options);
rocksdb_compactoptions_destroy(self.compaction_options); rocksdb_compactoptions_destroy(self.compaction_options);
rocksdb_options_destroy(self.options); rocksdb_options_destroy(self.options);
rocksdb_block_based_options_destroy(self.block_based_table_options); rocksdb_block_based_options_destroy(self.block_based_table_options);
@ -159,6 +165,7 @@ impl Db {
); );
if for_bulk_load { if for_bulk_load {
rocksdb_options_prepare_for_bulk_load(options); rocksdb_options_prepare_for_bulk_load(options);
rocksdb_options_set_error_if_exists(options, 1);
} }
let block_based_table_options = rocksdb_block_based_options_create(); let block_based_table_options = rocksdb_block_based_options_create();
assert!( assert!(
@ -292,6 +299,18 @@ impl Db {
"rocksdb_flushoptions_create returned null" "rocksdb_flushoptions_create returned null"
); );
let env_options = rocksdb_envoptions_create();
assert!(
!env_options.is_null(),
"rocksdb_envoptions_create returned null"
);
let ingest_external_file_options = rocksdb_ingestexternalfileoptions_create();
assert!(
!ingest_external_file_options.is_null(),
"rocksdb_ingestexternalfileoptions_create returned null"
);
let compaction_options = rocksdb_compactoptions_create(); let compaction_options = rocksdb_compactoptions_create();
assert!( assert!(
!compaction_options.is_null(), !compaction_options.is_null(),
@ -304,6 +323,8 @@ impl Db {
read_options, read_options,
write_options, write_options,
flush_options, flush_options,
env_options,
ingest_external_file_options,
compaction_options, compaction_options,
block_based_table_options, block_based_table_options,
env, env,
@ -311,6 +332,7 @@ impl Db {
cf_handles, cf_handles,
cf_options, cf_options,
cf_compaction_filters, cf_compaction_filters,
path: path.to_path_buf(),
}) })
} }
} }
@ -464,6 +486,41 @@ impl Db {
iter.status()?; // We makes sure there is no read problem iter.status()?; // We makes sure there is no read problem
Ok(!iter.is_valid()) Ok(!iter.is_valid())
} }
pub fn new_sst_file(&self) -> Result<SstFileWriter> {
unsafe {
let writer = rocksdb_sstfilewriter_create(self.0.env_options, self.0.options);
let cpath = CString::new(
self.0
.path
.join(random::<u128>().to_string())
.to_string_lossy()
.to_string(),
)
.map_err(invalid_input_error)?;
ffi_result!(rocksdb_sstfilewriter_open(writer, cpath.as_ptr()))?;
Ok(SstFileWriter { writer, cpath })
}
}
pub fn write_stt_files(
&self,
writers_with_cf: Vec<(&ColumnFamily, SstFileWriter)>,
) -> Result<()> {
for (cf, writer) in writers_with_cf {
unsafe {
ffi_result!(rocksdb_sstfilewriter_finish(writer.writer))?;
ffi_result!(rocksdb_ingest_external_file_cf(
self.0.db,
cf.0,
&writer.cpath.as_ptr(),
1,
self.0.ingest_external_file_options
))?;
}
}
Ok(())
}
} }
// It is fine to not keep a lifetime: there is no way to use this type without the database being still in scope. // It is fine to not keep a lifetime: there is no way to use this type without the database being still in scope.
@ -692,6 +749,49 @@ impl Iter {
} }
} }
pub struct SstFileWriter {
writer: *mut rocksdb_sstfilewriter_t,
cpath: CString,
}
impl Drop for SstFileWriter {
fn drop(&mut self) {
unsafe {
rocksdb_sstfilewriter_destroy(self.writer);
}
}
}
impl SstFileWriter {
pub fn insert(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
unsafe {
ffi_result!(rocksdb_sstfilewriter_put(
self.writer,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
))
}
}
pub fn insert_empty(&mut self, key: &[u8]) -> Result<()> {
self.insert(key, &[])
}
pub fn merge(&mut self, key: &[u8], value: &[u8]) -> Result<()> {
unsafe {
ffi_result!(rocksdb_sstfilewriter_merge(
self.writer,
key.as_ptr() as *const c_char,
key.len(),
value.as_ptr() as *const c_char,
value.len(),
))
}
}
}
fn convert_error(ptr: *const c_char) -> Error { fn convert_error(ptr: *const c_char) -> Error {
let message = unsafe { let message = unsafe {
let s = CStr::from_ptr(ptr).to_string_lossy().into_owned(); let s = CStr::from_ptr(ptr).to_string_lossy().into_owned();

@ -1,5 +1,5 @@
use crate::error::invalid_data_error; use crate::error::invalid_data_error;
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef, TermRef}; use crate::model::{GraphNameRef, NamedOrBlankNodeRef, Quad, QuadRef, TermRef};
use crate::storage::binary_encoder::{ use crate::storage::binary_encoder::{
decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple, decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple,
write_gosp_quad, write_gpos_quad, write_gspo_quad, write_osp_quad, write_ospg_quad, write_gosp_quad, write_gpos_quad, write_gspo_quad, write_osp_quad, write_ospg_quad,
@ -9,13 +9,19 @@ use crate::storage::binary_encoder::{
use crate::storage::numeric_encoder::{ use crate::storage::numeric_encoder::{
insert_term, remove_term, EncodedQuad, EncodedTerm, StrHash, StrLookup, insert_term, remove_term, EncodedQuad, EncodedTerm, StrHash, StrLookup,
}; };
#[cfg(not(target_arch = "wasm32"))]
use backend::SstFileWriter;
use backend::{ use backend::{
ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter, ColumnFamily, ColumnFamilyDefinition, CompactionAction, CompactionFilter, Db, Iter,
MergeOperator, WriteBatchWithIndex, MergeOperator, WriteBatchWithIndex,
}; };
#[cfg(not(target_arch = "wasm32"))]
use std::collections::{hash_map, HashMap, HashSet};
use std::ffi::CString; use std::ffi::CString;
use std::io::Result; use std::io::Result;
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
use std::mem::take;
#[cfg(not(target_arch = "wasm32"))]
use std::path::Path; use std::path::Path;
mod backend; mod backend;
@ -1033,6 +1039,218 @@ impl StorageWriter {
} }
} }
/// Creates a database from a dataset files.
#[cfg(not(target_arch = "wasm32"))]
pub struct BulkLoader {
storage: Storage,
id2str: HashMap<StrHash, (i32, Box<str>)>,
quads: HashSet<EncodedQuad>,
triples: HashSet<EncodedQuad>,
graphs: HashSet<EncodedTerm>,
}
#[cfg(not(target_arch = "wasm32"))]
impl BulkLoader {
pub fn new(path: &Path) -> Result<Self> {
Ok(Self {
storage: Storage::open(path, true)?, //TODO: remove bulk option
id2str: HashMap::default(),
quads: HashSet::default(),
triples: HashSet::default(),
graphs: HashSet::default(),
})
}
pub fn load(&mut self, quads: impl IntoIterator<Item = Result<Quad>>) -> Result<()> {
let mut count = 0;
for quad in quads {
let quad = quad?;
let encoded = EncodedQuad::from(quad.as_ref());
if quad.graph_name.is_default_graph() {
if self.triples.insert(encoded.clone()) {
self.insert_term(quad.subject.as_ref().into(), &encoded.subject);
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate);
self.insert_term(quad.object.as_ref(), &encoded.object);
}
} else if self.quads.insert(encoded.clone()) {
self.insert_term(quad.subject.as_ref().into(), &encoded.subject);
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate);
self.insert_term(quad.object.as_ref(), &encoded.object);
if self.graphs.insert(encoded.graph_name.clone()) {
self.insert_term(
match quad.graph_name.as_ref() {
GraphNameRef::NamedNode(n) => n.into(),
GraphNameRef::BlankNode(n) => n.into(),
GraphNameRef::DefaultGraph => unreachable!(),
},
&encoded.graph_name,
);
}
}
count += 1;
if count % (1024 * 1024) == 0 {
self.save()?;
}
}
self.save()?;
self.storage.compact()
}
fn save(&mut self) -> Result<()> {
let mut to_load = Vec::new();
// id2str
if !self.id2str.is_empty() {
let mut id2str = take(&mut self.id2str)
.into_iter()
.map(|(k, v)| (k.to_be_bytes(), v))
.collect::<Vec<_>>();
id2str.sort();
let mut id2str_sst = self.storage.db.new_sst_file()?;
let mut buffer = Vec::new();
for (k, (count, v)) in id2str {
buffer.extend_from_slice(&count.to_be_bytes());
buffer.extend_from_slice(v.as_bytes());
id2str_sst.merge(&k, &buffer)?;
buffer.clear();
}
to_load.push((&self.storage.id2str_cf, id2str_sst));
}
if !self.triples.is_empty() {
to_load.push((
&self.storage.dspo_cf,
self.add_keys_to_sst(
self.triples.iter().map(|quad| {
encode_term_triple(&quad.subject, &quad.predicate, &quad.object)
}),
)?,
));
to_load.push((
&self.storage.dpos_cf,
self.add_keys_to_sst(
self.triples.iter().map(|quad| {
encode_term_triple(&quad.predicate, &quad.object, &quad.subject)
}),
)?,
));
to_load.push((
&self.storage.dosp_cf,
self.add_keys_to_sst(
self.triples.iter().map(|quad| {
encode_term_triple(&quad.object, &quad.subject, &quad.predicate)
}),
)?,
));
self.triples.clear();
}
if !self.quads.is_empty() {
let quads = take(&mut self.graphs);
to_load.push((
&self.storage.graphs_cf,
self.add_keys_to_sst(quads.into_iter().map(|g| encode_term(&g)))?,
));
to_load.push((
&self.storage.gspo_cf,
self.add_keys_to_sst(self.quads.iter().map(|quad| {
encode_term_quad(
&quad.graph_name,
&quad.subject,
&quad.predicate,
&quad.object,
)
}))?,
));
to_load.push((
&self.storage.gpos_cf,
self.add_keys_to_sst(self.quads.iter().map(|quad| {
encode_term_quad(
&quad.graph_name,
&quad.object,
&quad.subject,
&quad.predicate,
)
}))?,
));
to_load.push((
&self.storage.gosp_cf,
self.add_keys_to_sst(self.quads.iter().map(|quad| {
encode_term_quad(
&quad.graph_name,
&quad.object,
&quad.subject,
&quad.predicate,
)
}))?,
));
to_load.push((
&self.storage.spog_cf,
self.add_keys_to_sst(self.quads.iter().map(|quad| {
encode_term_quad(
&quad.subject,
&quad.predicate,
&quad.object,
&quad.graph_name,
)
}))?,
));
to_load.push((
&self.storage.posg_cf,
self.add_keys_to_sst(self.quads.iter().map(|quad| {
encode_term_quad(
&quad.object,
&quad.subject,
&quad.predicate,
&quad.graph_name,
)
}))?,
));
to_load.push((
&self.storage.ospg_cf,
self.add_keys_to_sst(self.quads.iter().map(|quad| {
encode_term_quad(
&quad.object,
&quad.subject,
&quad.predicate,
&quad.graph_name,
)
}))?,
));
self.quads.clear();
}
self.storage.db.write_stt_files(to_load)
}
fn insert_term(&mut self, term: TermRef<'_>, encoded: &EncodedTerm) {
insert_term(
term,
encoded,
&mut |key, value| match self.id2str.entry(*key) {
hash_map::Entry::Occupied(mut e) => {
let e = e.get_mut();
e.0 = e.0.wrapping_add(1);
}
hash_map::Entry::Vacant(e) => {
e.insert((1, value.into()));
}
},
)
}
fn add_keys_to_sst(&self, values: impl Iterator<Item = Vec<u8>>) -> Result<SstFileWriter> {
let mut values = values.collect::<Vec<_>>();
values.sort_unstable();
let mut sst = self.storage.db.new_sst_file()?;
for t in values {
sst.insert_empty(&t)?;
}
Ok(sst)
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

@ -23,7 +23,8 @@
//! }; //! };
//! # Result::<_,Box<dyn std::error::Error>>::Ok(()) //! # Result::<_,Box<dyn std::error::Error>>::Ok(())
//! ``` //! ```
use crate::io::{DatasetFormat, GraphFormat}; use crate::error::invalid_input_error;
use crate::io::{DatasetFormat, DatasetParser, GraphFormat};
use crate::model::*; use crate::model::*;
use crate::sparql::{ use crate::sparql::{
evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update, evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update,
@ -31,6 +32,8 @@ use crate::sparql::{
}; };
use crate::storage::io::{dump_dataset, dump_graph, load_dataset, load_graph}; use crate::storage::io::{dump_dataset, dump_graph, load_dataset, load_graph};
use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm}; use crate::storage::numeric_encoder::{Decoder, EncodedQuad, EncodedTerm};
#[cfg(not(target_arch = "wasm32"))]
use crate::storage::BulkLoader;
use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage}; use crate::storage::{ChainedDecodingQuadIterator, DecodingGraphIterator, Storage};
use std::io::{BufRead, Write}; use std::io::{BufRead, Write};
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
@ -574,8 +577,9 @@ impl Store {
/// Creates a store efficiently from a dataset file. /// Creates a store efficiently from a dataset file.
/// ///
/// Warning: This functions is optimized for performances and saves the triples in a not atomic way. /// Warning: This function is optimized for speed and might eat a lot of memory.
/// If the parsing fails in the middle of the file, only a part of it may be written to the store. ///
/// Warning: If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// ///
/// Usage example: /// Usage example:
/// ``` /// ```
@ -606,14 +610,13 @@ impl Store {
format: DatasetFormat, format: DatasetFormat,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> io::Result<()> { ) -> io::Result<()> {
let storage = Storage::open(path, false)?; let mut parser = DatasetParser::from_format(format);
{ if let Some(base_iri) = base_iri {
let mut writer = storage.simple_writer(); parser = parser
load_dataset(&mut writer, reader, format, base_iri)?; .with_base_iri(base_iri)
writer.commit()?; .map_err(invalid_input_error)?;
} }
storage.flush()?; BulkLoader::new(path)?.load(parser.read_quads(reader)?)
storage.compact()
} }
} }

@ -2,6 +2,9 @@ use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::model::vocab::{rdf, xsd}; use oxigraph::model::vocab::{rdf, xsd};
use oxigraph::model::*; use oxigraph::model::*;
use oxigraph::store::Store; use oxigraph::store::Store;
use rand::random;
use std::env::temp_dir;
use std::fs::remove_dir_all;
use std::io; use std::io;
use std::io::Cursor; use std::io::Cursor;
use std::process::Command; use std::process::Command;
@ -99,6 +102,17 @@ fn test_load_dataset() -> io::Result<()> {
Ok(()) Ok(())
} }
#[test]
fn test_bulk_load_dataset() -> io::Result<()> {
let temp = temp_dir().join(random::<usize>().to_string());
Store::create_from_dataset(&temp, Cursor::new(DATA), DatasetFormat::TriG, None)?;
let store = Store::open(&temp)?;
for q in quads(GraphNameRef::DefaultGraph) {
assert!(store.contains(q)?);
}
remove_dir_all(&temp)
}
#[test] #[test]
fn test_dump_graph() -> io::Result<()> { fn test_dump_graph() -> io::Result<()> {
let store = Store::new()?; let store = Store::new()?;

Loading…
Cancel
Save