Simplifies bulk load API

pull/171/head
Tpt 3 years ago
parent 7b1c4e0ad5
commit 6f44a5956b
  1. 40
      lib/benches/store.rs
  2. 27
      lib/src/storage/backend/rocksdb.rs
  3. 48
      lib/src/storage/mod.rs
  4. 67
      lib/src/store.rs
  5. 10
      lib/tests/store.rs

@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use oxhttp::model::{Method, Request, Status};
use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::io::GraphFormat;
use oxigraph::model::GraphNameRef;
use oxigraph::sparql::{Query, QueryResults, Update};
use oxigraph::store::Store;
@ -36,13 +36,8 @@ fn store_load(c: &mut Criterion) {
group.bench_function("load BSBM explore 1000 in on disk with bulk load", |b| {
b.iter(|| {
let path = TempDir::default();
Store::create_from_dataset(
&path.0,
Cursor::new(&data),
DatasetFormat::NQuads,
None,
)
.unwrap();
let mut store = Store::open(&path.0).unwrap();
do_bulk_load(&mut store, &data);
})
});
}
@ -59,13 +54,8 @@ fn store_load(c: &mut Criterion) {
group.bench_function("load BSBM explore 10000 in on disk with bulk load", |b| {
b.iter(|| {
let path = TempDir::default();
Store::create_from_dataset(
&path.0,
Cursor::new(&data),
DatasetFormat::NQuads,
None,
)
.unwrap();
let mut store = Store::open(&path.0).unwrap();
do_bulk_load(&mut store, &data);
})
});
}
@ -83,6 +73,18 @@ fn do_load(store: &Store, data: &[u8]) {
store.optimize().unwrap();
}
fn do_bulk_load(store: &mut Store, data: &[u8]) {
store
.bulk_load_graph(
Cursor::new(&data),
GraphFormat::NTriples,
GraphNameRef::DefaultGraph,
None,
)
.unwrap();
store.optimize().unwrap();
}
fn store_query_and_update(c: &mut Criterion) {
let mut data = Vec::new();
read_data("explore-1000.nt.zst")
@ -114,8 +116,8 @@ fn store_query_and_update(c: &mut Criterion) {
group.sample_size(10);
{
let memory_store = Store::new().unwrap();
do_load(&memory_store, &data);
let mut memory_store = Store::new().unwrap();
do_bulk_load(&mut memory_store, &data);
group.bench_function("BSBM explore 1000 query in memory", |b| {
b.iter(|| run_operation(&memory_store, &query_operations))
});
@ -126,8 +128,8 @@ fn store_query_and_update(c: &mut Criterion) {
{
let path = TempDir::default();
let disk_store = Store::open(&path.0).unwrap();
do_load(&disk_store, &data);
let mut disk_store = Store::open(&path.0).unwrap();
do_bulk_load(&mut disk_store, &data);
group.bench_function("BSBM explore 1000 query on disk", |b| {
b.iter(|| run_operation(&disk_store, &query_operations))
});

@ -115,32 +115,17 @@ impl Db {
temp_dir()
}
.join("oxigraph-temp-rocksdb");
Ok(Self(Arc::new(Self::do_open(
&path,
column_families,
true,
false,
)?)))
Ok(Self(Arc::new(Self::do_open(&path, column_families, true)?)))
}
pub fn open(
path: &Path,
column_families: Vec<ColumnFamilyDefinition>,
for_bulk_load: bool,
) -> Result<Self> {
Ok(Self(Arc::new(Self::do_open(
path,
column_families,
false,
for_bulk_load,
)?)))
pub fn open(path: &Path, column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> {
Ok(Self(Arc::new(Self::do_open(path, column_families, false)?)))
}
fn do_open(
path: &Path,
mut column_families: Vec<ColumnFamilyDefinition>,
in_memory: bool,
for_bulk_load: bool,
) -> Result<DbHandler> {
let c_path = path_to_cstring(path)?;
@ -165,10 +150,6 @@ impl Db {
.try_into()
.unwrap(),
);
if for_bulk_load {
rocksdb_options_prepare_for_bulk_load(options);
rocksdb_options_set_error_if_exists(options, 1);
}
let block_based_table_options = rocksdb_block_based_options_create();
assert!(
!block_based_table_options.is_null(),
@ -566,12 +547,10 @@ impl Reader {
Ok(self.get(column_family, key)?.is_some()) //TODO: optimize
}
#[must_use]
pub fn iter(&self, column_family: &ColumnFamily) -> Result<Iter> {
self.scan_prefix(column_family, &[])
}
#[must_use]
pub fn scan_prefix(&self, column_family: &ColumnFamily, prefix: &[u8]) -> Result<Iter> {
//We generate the upper bound
let upper_bound = {

@ -69,8 +69,8 @@ impl Storage {
}
#[cfg(not(target_arch = "wasm32"))]
pub fn open(path: &Path, for_bulk_load: bool) -> Result<Self> {
Self::setup(Db::open(path, Self::column_families(), for_bulk_load)?)
pub fn open(path: &Path) -> Result<Self> {
Self::setup(Db::open(path, Self::column_families())?)
}
fn column_families() -> Vec<ColumnFamilyDefinition> {
@ -1085,24 +1085,28 @@ impl StorageWriter {
/// Creates a database from a dataset files.
#[cfg(not(target_arch = "wasm32"))]
pub struct BulkLoader {
storage: Storage,
pub struct BulkLoader<'a> {
storage: &'a Storage,
reader: Reader,
id2str: HashMap<StrHash, (i32, Box<str>)>,
quads: HashSet<EncodedQuad>,
triples: HashSet<EncodedQuad>,
graphs: HashSet<EncodedTerm>,
buffer: Vec<u8>,
}
#[cfg(not(target_arch = "wasm32"))]
impl BulkLoader {
pub fn new(path: &Path) -> Result<Self> {
Ok(Self {
storage: Storage::open(path, true)?, //TODO: remove bulk option
impl<'a> BulkLoader<'a> {
pub fn new(storage: &'a Storage) -> Self {
Self {
storage,
reader: storage.db.reader(),
id2str: HashMap::default(),
quads: HashSet::default(),
triples: HashSet::default(),
graphs: HashSet::default(),
})
buffer: Vec::new(),
}
}
pub fn load(&mut self, quads: impl IntoIterator<Item = Result<Quad>>) -> Result<()> {
@ -1110,17 +1114,36 @@ impl BulkLoader {
for quad in quads {
let quad = quad?;
let encoded = EncodedQuad::from(quad.as_ref());
self.buffer.clear();
if quad.graph_name.is_default_graph() {
if self.triples.insert(encoded.clone()) {
write_spo_quad(&mut self.buffer, &encoded);
if !self
.reader
.contains_key(&self.storage.dspo_cf, &self.buffer)?
&& self.triples.insert(encoded.clone())
{
self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?;
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?;
self.insert_term(quad.object.as_ref(), &encoded.object)?;
}
} else if self.quads.insert(encoded.clone()) {
} else {
write_spog_quad(&mut self.buffer, &encoded);
if !self
.reader
.contains_key(&self.storage.spog_cf, &self.buffer)?
&& self.quads.insert(encoded.clone())
{
self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?;
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?;
self.insert_term(quad.object.as_ref(), &encoded.object)?;
if self.graphs.insert(encoded.graph_name.clone()) {
self.buffer.clear();
write_term(&mut self.buffer, &encoded.graph_name);
if !self
.reader
.contains_key(&self.storage.graphs_cf, &self.buffer)?
&& self.graphs.insert(encoded.graph_name.clone())
{
self.insert_term(
match quad.graph_name.as_ref() {
GraphNameRef::NamedNode(n) => n.into(),
@ -1131,6 +1154,7 @@ impl BulkLoader {
)?;
}
}
}
count += 1;
if count % (1024 * 1024) == 0 {
self.save()?;

@ -24,7 +24,7 @@
//! # Result::<_,Box<dyn std::error::Error>>::Ok(())
//! ```
use crate::error::invalid_input_error;
use crate::io::{DatasetFormat, DatasetParser, GraphFormat};
use crate::io::{DatasetFormat, DatasetParser, GraphFormat, GraphParser};
use crate::model::*;
use crate::sparql::{
evaluate_query, evaluate_update, EvaluationError, Query, QueryOptions, QueryResults, Update,
@ -94,7 +94,7 @@ impl Store {
#[cfg(not(target_arch = "wasm32"))]
pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
Ok(Self {
storage: Storage::open(path.as_ref(), false)?,
storage: Storage::open(path.as_ref())?,
})
}
@ -578,11 +578,11 @@ impl Store {
self.storage.compact()
}
/// Creates a store efficiently from a dataset file.
/// Loads a dataset file efficiently into the store.
///
/// Warning: This function is optimized for speed and might eat a lot of memory.
///
/// Warning: If the parsing fails in the middle of the file, only a part of it may be written to the store.
/// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store.
///
/// Usage example:
/// ```
@ -590,14 +590,13 @@ impl Store {
/// use oxigraph::io::DatasetFormat;
/// use oxigraph::model::*;
///
/// let store = Store::new()?;
/// let mut store = Store::new()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
/// Store::create_from_dataset("example.db", file.as_ref(), DatasetFormat::NQuads, None)?;
/// store.bulk_load_dataset(file.as_ref(), DatasetFormat::NQuads, None)?;
///
/// // we inspect the store contents
/// let store = Store::open("example.db")?;
/// let ex = NamedNodeRef::new("http://example.com")?;
/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
@ -607,8 +606,8 @@ impl Store {
/// Errors related to a bad syntax in the loaded file use the [`InvalidData`](std::io::ErrorKind::InvalidData) or [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) error kinds.
/// Errors related to data loading into the store use the other error kinds.
#[cfg(not(target_arch = "wasm32"))]
pub fn create_from_dataset(
path: &Path,
pub fn bulk_load_dataset(
&mut self,
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
@ -619,7 +618,55 @@ impl Store {
.with_base_iri(base_iri)
.map_err(invalid_input_error)?;
}
BulkLoader::new(path)?.load(parser.read_quads(reader)?)
BulkLoader::new(&self.storage).load(parser.read_quads(reader)?)
}
/// Loads a dataset file efficiently into the store.
///
/// Warning: This function is optimized for speed and might eat a lot of memory.
///
/// Warning: This method is not atomic. If the parsing fails in the middle of the file, only a part of it may be written to the store.
///
/// Usage example:
/// ```
/// use oxigraph::store::Store;
/// use oxigraph::io::GraphFormat;
/// use oxigraph::model::*;
///
/// let mut store = Store::new()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> .";
/// store.bulk_load_graph(file.as_ref(), GraphFormat::NTriples, &GraphName::DefaultGraph, None)?;
///
/// // we inspect the store contents
/// let ex = NamedNodeRef::new("http://example.com")?;
/// assert!(store.contains(QuadRef::new(ex, ex, ex, GraphNameRef::DefaultGraph))?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
///
/// Errors related to parameter validation like the base IRI use the [`InvalidInput`](std::io::ErrorKind::InvalidInput) error kind.
/// Errors related to a bad syntax in the loaded file use the [`InvalidData`](std::io::ErrorKind::InvalidData) or [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) error kinds.
/// Errors related to data loading into the store use the other error kinds.
pub fn bulk_load_graph<'a>(
&mut self,
reader: impl BufRead,
format: GraphFormat,
to_graph_name: impl Into<GraphNameRef<'a>>,
base_iri: Option<&str>,
) -> io::Result<()> {
let mut parser = GraphParser::from_format(format);
if let Some(base_iri) = base_iri {
parser = parser
.with_base_iri(base_iri)
.map_err(invalid_input_error)?;
}
let to_graph_name = to_graph_name.into();
BulkLoader::new(&self.storage).load(
parser
.read_triples(reader)?
.map(|r| Ok(r?.in_graph(to_graph_name.into_owned()))),
)
}
}

@ -2,9 +2,6 @@ use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::model::vocab::{rdf, xsd};
use oxigraph::model::*;
use oxigraph::store::Store;
use rand::random;
use std::env::temp_dir;
use std::fs::remove_dir_all;
use std::io;
use std::io::Cursor;
use std::process::Command;
@ -104,13 +101,12 @@ fn test_load_dataset() -> io::Result<()> {
#[test]
fn test_bulk_load_dataset() -> io::Result<()> {
let temp = temp_dir().join(random::<usize>().to_string());
Store::create_from_dataset(&temp, Cursor::new(DATA), DatasetFormat::TriG, None)?;
let store = Store::open(&temp)?;
let mut store = Store::new().unwrap();
store.bulk_load_dataset(Cursor::new(DATA), DatasetFormat::TriG, None)?;
for q in quads(GraphNameRef::DefaultGraph) {
assert!(store.contains(q)?);
}
remove_dir_all(&temp)
Ok(())
}
#[test]

Loading…
Cancel
Save