From 986d3e60bb150ad7dcd5f2ee0c273b5a9e2229d6 Mon Sep 17 00:00:00 2001 From: Tpt Date: Fri, 5 Nov 2021 19:35:22 +0100 Subject: [PATCH] RocksDB: bulk load TODO: we do a lot of lookups during load for GC, so the option is not very useful --- lib/benches/store.rs | 9 ++++++- lib/src/storage/backend/rocksdb.rs | 24 ++++++++++++++--- lib/src/storage/mod.rs | 4 +-- lib/src/store.rs | 41 +++++++++++++++++++++++++++++- 4 files changed, 71 insertions(+), 7 deletions(-) diff --git a/lib/benches/store.rs b/lib/benches/store.rs index 1d43ff96..b6a30981 100644 --- a/lib/benches/store.rs +++ b/lib/benches/store.rs @@ -1,6 +1,6 @@ use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use oxhttp::model::{Method, Request, Status}; -use oxigraph::io::GraphFormat; +use oxigraph::io::{DatasetFormat, GraphFormat}; use oxigraph::model::GraphNameRef; use oxigraph::sparql::{Query, QueryResults, Update}; use oxigraph::store::Store; @@ -32,6 +32,13 @@ fn store_load(c: &mut Criterion) { do_load(&store, &data); }) }); + group.bench_function("load BSBM explore 1000 in on disk with bulk load", |b| { + b.iter(|| { + let path = TempDir::default(); + Store::create_from_dataset(&path.0, Cursor::new(&data), DatasetFormat::NQuads, None) + .unwrap(); + }) + }); } fn do_load(store: &Store, data: &[u8]) { diff --git a/lib/src/storage/backend/rocksdb.rs b/lib/src/storage/backend/rocksdb.rs index 151c9969..49b97b6d 100644 --- a/lib/src/storage/backend/rocksdb.rs +++ b/lib/src/storage/backend/rocksdb.rs @@ -95,17 +95,32 @@ impl Db { temp_dir() } .join("oxigraph-temp-rocksdb"); - Ok(Self(Arc::new(Self::do_open(&path, column_families, true)?))) + Ok(Self(Arc::new(Self::do_open( + &path, + column_families, + true, + false, + )?))) } - pub fn open(path: &Path, column_families: Vec) -> Result { - Ok(Self(Arc::new(Self::do_open(path, column_families, false)?))) + pub fn open( + path: &Path, + column_families: Vec, + for_bulk_load: bool, + ) -> Result { + Ok(Self(Arc::new(Self::do_open( + path, + column_families, + false, + for_bulk_load, + )?))) } fn do_open( path: &Path, mut column_families: Vec, in_memory: bool, + for_bulk_load: bool, ) -> Result { let c_path = CString::new( path.to_str() @@ -134,6 +149,9 @@ impl Db { .try_into() .unwrap(), ); + if for_bulk_load { + rocksdb_options_prepare_for_bulk_load(options); + } let env = if in_memory { let env = rocksdb_create_mem_env(); diff --git a/lib/src/storage/mod.rs b/lib/src/storage/mod.rs index 30f496d3..c73fd06d 100644 --- a/lib/src/storage/mod.rs +++ b/lib/src/storage/mod.rs @@ -61,8 +61,8 @@ impl Storage { } #[cfg(not(target_arch = "wasm32"))] - pub fn open(path: &Path) -> Result { - Self::setup(Db::open(path, Self::column_families())?) + pub fn open(path: &Path, for_bulk_load: bool) -> Result { + Self::setup(Db::open(path, Self::column_families(), for_bulk_load)?) } fn column_families() -> Vec { diff --git a/lib/src/store.rs b/lib/src/store.rs index f371a6c8..dc343967 100644 --- a/lib/src/store.rs +++ b/lib/src/store.rs @@ -88,7 +88,7 @@ impl Store { #[cfg(not(target_arch = "wasm32"))] pub fn open(path: impl AsRef) -> io::Result { Ok(Self { - storage: Storage::open(path.as_ref())?, + storage: Storage::open(path.as_ref(), false)?, }) } @@ -571,6 +571,45 @@ impl Store { pub fn optimize(&self) -> io::Result<()> { self.storage.compact() } + + /// Creates a store efficiently from a dataset file. + /// + /// Warning: This functions is optimized for performances and saves the triples in a not atomic way. + /// If the parsing fails in the middle of the file, only a part of it may be written to the store. + /// + /// Usage example: + /// ``` + /// use oxigraph::store::Store; + /// use oxigraph::io::DatasetFormat; + /// use oxigraph::model::*; + /// + /// let store = Store::new()?; + /// + /// // insertion + /// let file = b" ."; + /// Store::create_from_dataset("example.db", file.as_ref(), DatasetFormat::NQuads, None)?; + /// + /// // we inspect the store contents + /// let store = Store::open("example.db")?; + /// let ex = NamedNodeRef::new("http://example.com")?; + /// assert!(store.contains(QuadRef::new(ex, ex, ex, ex))?); + /// # Result::<_,Box>::Ok(()) + /// ``` + /// + /// Errors related to parameter validation like the base IRI use the [`InvalidInput`](std::io::ErrorKind::InvalidInput) error kind. + /// Errors related to a bad syntax in the loaded file use the [`InvalidData`](std::io::ErrorKind::InvalidData) or [`UnexpectedEof`](std::io::ErrorKind::UnexpectedEof) error kinds. + /// Errors related to data loading into the store use the other error kinds. + pub fn create_from_dataset( + path: &Path, + reader: impl BufRead, + format: DatasetFormat, + base_iri: Option<&str>, + ) -> io::Result<()> { + let storage = Storage::open(path.as_ref(), false)?; + load_dataset(&storage, reader, format, base_iri)?; + storage.flush()?; + storage.compact() + } } impl fmt::Display for Store {