RocksDB: bulk load

TODO: we do a lot of lookups during load for GC, so the option is not very useful
pull/171/head
Tpt 3 years ago
parent 98f9a307b8
commit 986d3e60bb
  1. 9
      lib/benches/store.rs
  2. 24
      lib/src/storage/backend/rocksdb.rs
  3. 4
      lib/src/storage/mod.rs
  4. 41
      lib/src/store.rs

@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use criterion::{criterion_group, criterion_main, Criterion, Throughput};
use oxhttp::model::{Method, Request, Status}; use oxhttp::model::{Method, Request, Status};
use oxigraph::io::GraphFormat; use oxigraph::io::{DatasetFormat, GraphFormat};
use oxigraph::model::GraphNameRef; use oxigraph::model::GraphNameRef;
use oxigraph::sparql::{Query, QueryResults, Update}; use oxigraph::sparql::{Query, QueryResults, Update};
use oxigraph::store::Store; use oxigraph::store::Store;
@ -32,6 +32,13 @@ fn store_load(c: &mut Criterion) {
do_load(&store, &data); do_load(&store, &data);
}) })
}); });
group.bench_function("load BSBM explore 1000 in on disk with bulk load", |b| {
b.iter(|| {
let path = TempDir::default();
Store::create_from_dataset(&path.0, Cursor::new(&data), DatasetFormat::NQuads, None)
.unwrap();
})
});
} }
fn do_load(store: &Store, data: &[u8]) { fn do_load(store: &Store, data: &[u8]) {

@ -95,17 +95,32 @@ impl Db {
temp_dir() temp_dir()
} }
.join("oxigraph-temp-rocksdb"); .join("oxigraph-temp-rocksdb");
Ok(Self(Arc::new(Self::do_open(&path, column_families, true)?))) Ok(Self(Arc::new(Self::do_open(
&path,
column_families,
true,
false,
)?)))
} }
pub fn open(path: &Path, column_families: Vec<ColumnFamilyDefinition>) -> Result<Self> { pub fn open(
Ok(Self(Arc::new(Self::do_open(path, column_families, false)?))) path: &Path,
column_families: Vec<ColumnFamilyDefinition>,
for_bulk_load: bool,
) -> Result<Self> {
Ok(Self(Arc::new(Self::do_open(
path,
column_families,
false,
for_bulk_load,
)?)))
} }
fn do_open( fn do_open(
path: &Path, path: &Path,
mut column_families: Vec<ColumnFamilyDefinition>, mut column_families: Vec<ColumnFamilyDefinition>,
in_memory: bool, in_memory: bool,
for_bulk_load: bool,
) -> Result<DbHandler> { ) -> Result<DbHandler> {
let c_path = CString::new( let c_path = CString::new(
path.to_str() path.to_str()
@ -134,6 +149,9 @@ impl Db {
.try_into() .try_into()
.unwrap(), .unwrap(),
); );
if for_bulk_load {
rocksdb_options_prepare_for_bulk_load(options);
}
let env = if in_memory { let env = if in_memory {
let env = rocksdb_create_mem_env(); let env = rocksdb_create_mem_env();

@ -61,8 +61,8 @@ impl Storage {
} }
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn open(path: &Path) -> Result<Self> { pub fn open(path: &Path, for_bulk_load: bool) -> Result<Self> {
Self::setup(Db::open(path, Self::column_families())?) Self::setup(Db::open(path, Self::column_families(), for_bulk_load)?)
} }
fn column_families() -> Vec<ColumnFamilyDefinition> { fn column_families() -> Vec<ColumnFamilyDefinition> {

@ -88,7 +88,7 @@ impl Store {
#[cfg(not(target_arch = "wasm32"))] #[cfg(not(target_arch = "wasm32"))]
pub fn open(path: impl AsRef<Path>) -> io::Result<Self> { pub fn open(path: impl AsRef<Path>) -> io::Result<Self> {
Ok(Self { Ok(Self {
storage: Storage::open(path.as_ref())?, storage: Storage::open(path.as_ref(), false)?,
}) })
} }
@ -571,6 +571,45 @@ impl Store {
pub fn optimize(&self) -> io::Result<()> { pub fn optimize(&self) -> io::Result<()> {
self.storage.compact() self.storage.compact()
} }
/// Creates a store efficiently from a dataset file.
///
/// Warning: This functions is optimized for performances and saves the triples in a not atomic way.
/// If the parsing fails in the middle of the file, only a part of it may be written to the store.
///
/// Usage example:
/// ```
/// use oxigraph::store::Store;
/// use oxigraph::io::DatasetFormat;
/// use oxigraph::model::*;
///
/// let store = Store::new()?;
///
/// // insertion
/// let file = b"<http://example.com> <http://example.com> <http://example.com> <http://example.com> .";
/// Store::create_from_dataset("example.db", file.as_ref(), DatasetFormat::NQuads, None)?;
///
/// // we inspect the store contents
/// let store = Store::open("example.db")?;
/// let ex = NamedNodeRef::new("http://example.com")?;
/// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?);
/// # Result::<_,Box<dyn std::error::Error>>::Ok(())
/// ```
///
/// Errors related to parameter validation like the base IRI use the [`InvalidInput`](std::io::ErrorKind::InvalidInput) error kind.
/// Errors related to a bad syntax in the loaded file use the [`InvalidData`](std::io::ErrorKind::InvalidData) or [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) error kinds.
/// Errors related to data loading into the store use the other error kinds.
pub fn create_from_dataset(
path: &Path,
reader: impl BufRead,
format: DatasetFormat,
base_iri: Option<&str>,
) -> io::Result<()> {
let storage = Storage::open(path.as_ref(), false)?;
load_dataset(&storage, reader, format, base_iri)?;
storage.flush()?;
storage.compact()
}
} }
impl fmt::Display for Store { impl fmt::Display for Store {

Loading…
Cancel
Save