Fork of https://github.com/oxigraph/oxigraph.git for the purpose of NextGraph project
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1554 lines
53 KiB
1554 lines
53 KiB
#![allow(clippy::same_name_method)]
|
|
#[cfg(not(target_family = "wasm"))]
|
|
use crate::model::Quad;
|
|
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef, TermRef};
|
|
use crate::storage::backend::{Reader, Transaction};
|
|
#[cfg(not(target_family = "wasm"))]
|
|
use crate::storage::binary_encoder::LATEST_STORAGE_VERSION;
|
|
use crate::storage::binary_encoder::{
|
|
decode_term, encode_term, encode_term_pair, encode_term_quad, encode_term_triple,
|
|
write_gosp_quad, write_gpos_quad, write_gspo_quad, write_osp_quad, write_ospg_quad,
|
|
write_pos_quad, write_posg_quad, write_spo_quad, write_spog_quad, write_term, QuadEncoding,
|
|
WRITTEN_TERM_MAX_SIZE,
|
|
};
|
|
pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, StorageError};
|
|
#[cfg(not(target_family = "wasm"))]
|
|
use crate::storage::numeric_encoder::Decoder;
|
|
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
|
|
use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
|
|
#[cfg(not(target_family = "wasm"))]
|
|
use std::collections::VecDeque;
|
|
#[cfg(not(target_family = "wasm"))]
|
|
use std::collections::{HashMap, HashSet};
|
|
use std::error::Error;
|
|
#[cfg(not(target_family = "wasm"))]
|
|
use std::mem::{swap, take};
|
|
#[cfg(not(target_family = "wasm"))]
|
|
use std::path::{Path, PathBuf};
|
|
#[cfg(not(target_family = "wasm"))]
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
#[cfg(not(target_family = "wasm"))]
|
|
use std::{io, thread};
|
|
|
|
mod backend;
|
|
mod binary_encoder;
|
|
mod error;
|
|
pub mod numeric_encoder;
|
|
pub mod small_string;
|
|
|
|
const ID2STR_CF: &str = "id2str";
|
|
const SPOG_CF: &str = "spog";
|
|
const POSG_CF: &str = "posg";
|
|
const OSPG_CF: &str = "ospg";
|
|
const GSPO_CF: &str = "gspo";
|
|
const GPOS_CF: &str = "gpos";
|
|
const GOSP_CF: &str = "gosp";
|
|
const DSPO_CF: &str = "dspo";
|
|
const DPOS_CF: &str = "dpos";
|
|
const DOSP_CF: &str = "dosp";
|
|
const GRAPHS_CF: &str = "graphs";
|
|
#[cfg(not(target_family = "wasm"))]
|
|
const DEFAULT_CF: &str = "default";
|
|
#[cfg(not(target_family = "wasm"))]
|
|
const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000;
|
|
|
|
/// Low level storage primitives
|
|
#[derive(Clone)]
|
|
pub struct Storage {
|
|
db: Db,
|
|
#[cfg(not(target_family = "wasm"))]
|
|
default_cf: ColumnFamily,
|
|
id2str_cf: ColumnFamily,
|
|
spog_cf: ColumnFamily,
|
|
posg_cf: ColumnFamily,
|
|
ospg_cf: ColumnFamily,
|
|
gspo_cf: ColumnFamily,
|
|
gpos_cf: ColumnFamily,
|
|
gosp_cf: ColumnFamily,
|
|
dspo_cf: ColumnFamily,
|
|
dpos_cf: ColumnFamily,
|
|
dosp_cf: ColumnFamily,
|
|
graphs_cf: ColumnFamily,
|
|
}
|
|
|
|
impl Storage {
|
|
pub fn new() -> Result<Self, StorageError> {
|
|
Self::setup(Db::new(Self::column_families())?)
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn open(path: &Path) -> Result<Self, StorageError> {
|
|
Self::setup(Db::open_read_write(Some(path), Self::column_families())?)
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn open_secondary(primary_path: &Path) -> Result<Self, StorageError> {
|
|
Self::setup(Db::open_secondary(
|
|
primary_path,
|
|
None,
|
|
Self::column_families(),
|
|
)?)
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn open_persistent_secondary(
|
|
primary_path: &Path,
|
|
secondary_path: &Path,
|
|
) -> Result<Self, StorageError> {
|
|
Self::setup(Db::open_secondary(
|
|
primary_path,
|
|
Some(secondary_path),
|
|
Self::column_families(),
|
|
)?)
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn open_read_only(path: &Path) -> Result<Self, StorageError> {
|
|
Self::setup(Db::open_read_only(path, Self::column_families())?)
|
|
}
|
|
|
|
fn column_families() -> Vec<ColumnFamilyDefinition> {
|
|
vec![
|
|
ColumnFamilyDefinition {
|
|
name: ID2STR_CF,
|
|
use_iter: false,
|
|
min_prefix_size: 0,
|
|
unordered_writes: true,
|
|
},
|
|
ColumnFamilyDefinition {
|
|
name: SPOG_CF,
|
|
use_iter: true,
|
|
min_prefix_size: 17, // named or blank node start
|
|
unordered_writes: false,
|
|
},
|
|
ColumnFamilyDefinition {
|
|
name: POSG_CF,
|
|
use_iter: true,
|
|
min_prefix_size: 17, // named node start
|
|
unordered_writes: false,
|
|
},
|
|
ColumnFamilyDefinition {
|
|
name: OSPG_CF,
|
|
use_iter: true,
|
|
min_prefix_size: 0, // There are small literals...
|
|
unordered_writes: false,
|
|
},
|
|
ColumnFamilyDefinition {
|
|
name: GSPO_CF,
|
|
use_iter: true,
|
|
min_prefix_size: 17, // named or blank node start
|
|
unordered_writes: false,
|
|
},
|
|
ColumnFamilyDefinition {
|
|
name: GPOS_CF,
|
|
use_iter: true,
|
|
min_prefix_size: 17, // named or blank node start
|
|
unordered_writes: false,
|
|
},
|
|
ColumnFamilyDefinition {
|
|
name: GOSP_CF,
|
|
use_iter: true,
|
|
min_prefix_size: 17, // named or blank node start
|
|
unordered_writes: false,
|
|
},
|
|
ColumnFamilyDefinition {
|
|
name: DSPO_CF,
|
|
use_iter: true,
|
|
min_prefix_size: 17, // named or blank node start
|
|
unordered_writes: false,
|
|
},
|
|
ColumnFamilyDefinition {
|
|
name: DPOS_CF,
|
|
use_iter: true,
|
|
min_prefix_size: 17, // named or blank node start
|
|
unordered_writes: false,
|
|
},
|
|
ColumnFamilyDefinition {
|
|
name: DOSP_CF,
|
|
use_iter: true,
|
|
min_prefix_size: 0, // There are small literals...
|
|
unordered_writes: false,
|
|
},
|
|
ColumnFamilyDefinition {
|
|
name: GRAPHS_CF,
|
|
use_iter: true,
|
|
min_prefix_size: 17, // named or blank node start
|
|
unordered_writes: false,
|
|
},
|
|
]
|
|
}
|
|
|
|
fn setup(db: Db) -> Result<Self, StorageError> {
|
|
let this = Self {
|
|
#[cfg(not(target_family = "wasm"))]
|
|
default_cf: db.column_family(DEFAULT_CF)?,
|
|
id2str_cf: db.column_family(ID2STR_CF)?,
|
|
spog_cf: db.column_family(SPOG_CF)?,
|
|
posg_cf: db.column_family(POSG_CF)?,
|
|
ospg_cf: db.column_family(OSPG_CF)?,
|
|
gspo_cf: db.column_family(GSPO_CF)?,
|
|
gpos_cf: db.column_family(GPOS_CF)?,
|
|
gosp_cf: db.column_family(GOSP_CF)?,
|
|
dspo_cf: db.column_family(DSPO_CF)?,
|
|
dpos_cf: db.column_family(DPOS_CF)?,
|
|
dosp_cf: db.column_family(DOSP_CF)?,
|
|
graphs_cf: db.column_family(GRAPHS_CF)?,
|
|
db,
|
|
};
|
|
#[cfg(not(target_family = "wasm"))]
|
|
this.migrate()?;
|
|
Ok(this)
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
fn migrate(&self) -> Result<(), StorageError> {
|
|
let mut version = self.ensure_version()?;
|
|
if version == 0 {
|
|
// We migrate to v1
|
|
let mut graph_names = HashSet::new();
|
|
for quad in self.snapshot().quads() {
|
|
let quad = quad?;
|
|
if !quad.graph_name.is_default_graph() {
|
|
graph_names.insert(quad.graph_name);
|
|
}
|
|
}
|
|
let mut graph_names = graph_names
|
|
.into_iter()
|
|
.map(|g| encode_term(&g))
|
|
.collect::<Vec<_>>();
|
|
graph_names.sort_unstable();
|
|
let mut stt_file = self.db.new_sst_file()?;
|
|
for k in graph_names {
|
|
stt_file.insert_empty(&k)?;
|
|
}
|
|
self.db
|
|
.insert_stt_files(&[(&self.graphs_cf, stt_file.finish()?)])?;
|
|
version = 1;
|
|
self.update_version(version)?;
|
|
}
|
|
|
|
match version {
|
|
_ if version < LATEST_STORAGE_VERSION => Err(CorruptionError::msg(format!(
|
|
"The RocksDB database is using the outdated encoding version {version}. Automated migration is not supported, please dump the store dataset using a compatible Oxigraph version and load it again using the current version"
|
|
|
|
)).into()),
|
|
LATEST_STORAGE_VERSION => Ok(()),
|
|
_ => Err(CorruptionError::msg(format!(
|
|
"The RocksDB database is using the too recent version {version}. Upgrade to the latest Oxigraph version to load this database"
|
|
|
|
)).into())
|
|
}
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
fn ensure_version(&self) -> Result<u64, StorageError> {
|
|
Ok(
|
|
if let Some(version) = self.db.get(&self.default_cf, b"oxversion")? {
|
|
u64::from_be_bytes(version.as_ref().try_into().map_err(|e| {
|
|
CorruptionError::new(format!("Error while parsing the version key: {e}"))
|
|
})?)
|
|
} else {
|
|
self.update_version(LATEST_STORAGE_VERSION)?;
|
|
LATEST_STORAGE_VERSION
|
|
},
|
|
)
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
fn update_version(&self, version: u64) -> Result<(), StorageError> {
|
|
self.db
|
|
.insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?;
|
|
self.db.flush(&self.default_cf)
|
|
}
|
|
|
|
pub fn snapshot(&self) -> StorageReader {
|
|
StorageReader {
|
|
reader: self.db.snapshot(),
|
|
storage: self.clone(),
|
|
}
|
|
}
|
|
|
|
pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From<StorageError>>(
|
|
&'b self,
|
|
f: impl Fn(StorageWriter<'a>) -> Result<T, E>,
|
|
) -> Result<T, E> {
|
|
self.db.transaction(|transaction| {
|
|
f(StorageWriter {
|
|
buffer: Vec::new(),
|
|
transaction,
|
|
storage: self,
|
|
})
|
|
})
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn flush(&self) -> Result<(), StorageError> {
|
|
self.db.flush(&self.default_cf)?;
|
|
self.db.flush(&self.gspo_cf)?;
|
|
self.db.flush(&self.gpos_cf)?;
|
|
self.db.flush(&self.gosp_cf)?;
|
|
self.db.flush(&self.spog_cf)?;
|
|
self.db.flush(&self.posg_cf)?;
|
|
self.db.flush(&self.ospg_cf)?;
|
|
self.db.flush(&self.dspo_cf)?;
|
|
self.db.flush(&self.dpos_cf)?;
|
|
self.db.flush(&self.dosp_cf)?;
|
|
self.db.flush(&self.id2str_cf)
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn compact(&self) -> Result<(), StorageError> {
|
|
self.db.compact(&self.default_cf)?;
|
|
self.db.compact(&self.gspo_cf)?;
|
|
self.db.compact(&self.gpos_cf)?;
|
|
self.db.compact(&self.gosp_cf)?;
|
|
self.db.compact(&self.spog_cf)?;
|
|
self.db.compact(&self.posg_cf)?;
|
|
self.db.compact(&self.ospg_cf)?;
|
|
self.db.compact(&self.dspo_cf)?;
|
|
self.db.compact(&self.dpos_cf)?;
|
|
self.db.compact(&self.dosp_cf)?;
|
|
self.db.compact(&self.id2str_cf)
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> {
|
|
self.db.backup(target_directory)
|
|
}
|
|
}
|
|
|
|
pub struct StorageReader {
|
|
reader: Reader,
|
|
storage: Storage,
|
|
}
|
|
|
|
impl StorageReader {
|
|
pub fn len(&self) -> Result<usize, StorageError> {
|
|
Ok(self.reader.len(&self.storage.gspo_cf)? + self.reader.len(&self.storage.dspo_cf)?)
|
|
}
|
|
|
|
pub fn is_empty(&self) -> Result<bool, StorageError> {
|
|
Ok(self.reader.is_empty(&self.storage.gspo_cf)?
|
|
&& self.reader.is_empty(&self.storage.dspo_cf)?)
|
|
}
|
|
|
|
pub fn contains(&self, quad: &EncodedQuad) -> Result<bool, StorageError> {
|
|
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
|
|
if quad.graph_name.is_default_graph() {
|
|
write_spo_quad(&mut buffer, quad);
|
|
Ok(self.reader.contains_key(&self.storage.dspo_cf, &buffer)?)
|
|
} else {
|
|
write_gspo_quad(&mut buffer, quad);
|
|
Ok(self.reader.contains_key(&self.storage.gspo_cf, &buffer)?)
|
|
}
|
|
}
|
|
|
|
pub fn quads_for_pattern(
|
|
&self,
|
|
subject: Option<&EncodedTerm>,
|
|
predicate: Option<&EncodedTerm>,
|
|
object: Option<&EncodedTerm>,
|
|
graph_name: Option<&EncodedTerm>,
|
|
) -> ChainedDecodingQuadIterator {
|
|
match subject {
|
|
Some(subject) => match predicate {
|
|
Some(predicate) => match object {
|
|
Some(object) => match graph_name {
|
|
Some(graph_name) => self.quads_for_subject_predicate_object_graph(
|
|
subject, predicate, object, graph_name,
|
|
),
|
|
None => self.quads_for_subject_predicate_object(subject, predicate, object),
|
|
},
|
|
None => match graph_name {
|
|
Some(graph_name) => {
|
|
self.quads_for_subject_predicate_graph(subject, predicate, graph_name)
|
|
}
|
|
None => self.quads_for_subject_predicate(subject, predicate),
|
|
},
|
|
},
|
|
None => match object {
|
|
Some(object) => match graph_name {
|
|
Some(graph_name) => {
|
|
self.quads_for_subject_object_graph(subject, object, graph_name)
|
|
}
|
|
None => self.quads_for_subject_object(subject, object),
|
|
},
|
|
None => match graph_name {
|
|
Some(graph_name) => self.quads_for_subject_graph(subject, graph_name),
|
|
None => self.quads_for_subject(subject),
|
|
},
|
|
},
|
|
},
|
|
None => match predicate {
|
|
Some(predicate) => match object {
|
|
Some(object) => match graph_name {
|
|
Some(graph_name) => {
|
|
self.quads_for_predicate_object_graph(predicate, object, graph_name)
|
|
}
|
|
None => self.quads_for_predicate_object(predicate, object),
|
|
},
|
|
None => match graph_name {
|
|
Some(graph_name) => self.quads_for_predicate_graph(predicate, graph_name),
|
|
None => self.quads_for_predicate(predicate),
|
|
},
|
|
},
|
|
None => match object {
|
|
Some(object) => match graph_name {
|
|
Some(graph_name) => self.quads_for_object_graph(object, graph_name),
|
|
None => self.quads_for_object(object),
|
|
},
|
|
None => match graph_name {
|
|
Some(graph_name) => self.quads_for_graph(graph_name),
|
|
None => self.quads(),
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
pub fn quads(&self) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::pair(self.dspo_quads(&[]), self.gspo_quads(&[]))
|
|
}
|
|
|
|
fn quads_in_named_graph(&self) -> DecodingQuadIterator {
|
|
self.gspo_quads(&[])
|
|
}
|
|
|
|
fn quads_for_subject(&self, subject: &EncodedTerm) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::pair(
|
|
self.dspo_quads(&encode_term(subject)),
|
|
self.spog_quads(&encode_term(subject)),
|
|
)
|
|
}
|
|
|
|
fn quads_for_subject_predicate(
|
|
&self,
|
|
subject: &EncodedTerm,
|
|
predicate: &EncodedTerm,
|
|
) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::pair(
|
|
self.dspo_quads(&encode_term_pair(subject, predicate)),
|
|
self.spog_quads(&encode_term_pair(subject, predicate)),
|
|
)
|
|
}
|
|
|
|
fn quads_for_subject_predicate_object(
|
|
&self,
|
|
subject: &EncodedTerm,
|
|
predicate: &EncodedTerm,
|
|
object: &EncodedTerm,
|
|
) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::pair(
|
|
self.dspo_quads(&encode_term_triple(subject, predicate, object)),
|
|
self.spog_quads(&encode_term_triple(subject, predicate, object)),
|
|
)
|
|
}
|
|
|
|
fn quads_for_subject_object(
|
|
&self,
|
|
subject: &EncodedTerm,
|
|
object: &EncodedTerm,
|
|
) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::pair(
|
|
self.dosp_quads(&encode_term_pair(object, subject)),
|
|
self.ospg_quads(&encode_term_pair(object, subject)),
|
|
)
|
|
}
|
|
|
|
fn quads_for_predicate(&self, predicate: &EncodedTerm) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::pair(
|
|
self.dpos_quads(&encode_term(predicate)),
|
|
self.posg_quads(&encode_term(predicate)),
|
|
)
|
|
}
|
|
|
|
fn quads_for_predicate_object(
|
|
&self,
|
|
predicate: &EncodedTerm,
|
|
object: &EncodedTerm,
|
|
) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::pair(
|
|
self.dpos_quads(&encode_term_pair(predicate, object)),
|
|
self.posg_quads(&encode_term_pair(predicate, object)),
|
|
)
|
|
}
|
|
|
|
fn quads_for_object(&self, object: &EncodedTerm) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::pair(
|
|
self.dosp_quads(&encode_term(object)),
|
|
self.ospg_quads(&encode_term(object)),
|
|
)
|
|
}
|
|
|
|
fn quads_for_graph(&self, graph_name: &EncodedTerm) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
|
|
self.dspo_quads(&Vec::default())
|
|
} else {
|
|
self.gspo_quads(&encode_term(graph_name))
|
|
})
|
|
}
|
|
|
|
fn quads_for_subject_graph(
|
|
&self,
|
|
subject: &EncodedTerm,
|
|
graph_name: &EncodedTerm,
|
|
) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
|
|
self.dspo_quads(&encode_term(subject))
|
|
} else {
|
|
self.gspo_quads(&encode_term_pair(graph_name, subject))
|
|
})
|
|
}
|
|
|
|
fn quads_for_subject_predicate_graph(
|
|
&self,
|
|
subject: &EncodedTerm,
|
|
predicate: &EncodedTerm,
|
|
graph_name: &EncodedTerm,
|
|
) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
|
|
self.dspo_quads(&encode_term_pair(subject, predicate))
|
|
} else {
|
|
self.gspo_quads(&encode_term_triple(graph_name, subject, predicate))
|
|
})
|
|
}
|
|
|
|
fn quads_for_subject_predicate_object_graph(
|
|
&self,
|
|
subject: &EncodedTerm,
|
|
predicate: &EncodedTerm,
|
|
object: &EncodedTerm,
|
|
graph_name: &EncodedTerm,
|
|
) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
|
|
self.dspo_quads(&encode_term_triple(subject, predicate, object))
|
|
} else {
|
|
self.gspo_quads(&encode_term_quad(graph_name, subject, predicate, object))
|
|
})
|
|
}
|
|
|
|
fn quads_for_subject_object_graph(
|
|
&self,
|
|
subject: &EncodedTerm,
|
|
object: &EncodedTerm,
|
|
graph_name: &EncodedTerm,
|
|
) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
|
|
self.dosp_quads(&encode_term_pair(object, subject))
|
|
} else {
|
|
self.gosp_quads(&encode_term_triple(graph_name, object, subject))
|
|
})
|
|
}
|
|
|
|
fn quads_for_predicate_graph(
|
|
&self,
|
|
predicate: &EncodedTerm,
|
|
graph_name: &EncodedTerm,
|
|
) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
|
|
self.dpos_quads(&encode_term(predicate))
|
|
} else {
|
|
self.gpos_quads(&encode_term_pair(graph_name, predicate))
|
|
})
|
|
}
|
|
|
|
fn quads_for_predicate_object_graph(
|
|
&self,
|
|
predicate: &EncodedTerm,
|
|
object: &EncodedTerm,
|
|
graph_name: &EncodedTerm,
|
|
) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
|
|
self.dpos_quads(&encode_term_pair(predicate, object))
|
|
} else {
|
|
self.gpos_quads(&encode_term_triple(graph_name, predicate, object))
|
|
})
|
|
}
|
|
|
|
fn quads_for_object_graph(
|
|
&self,
|
|
object: &EncodedTerm,
|
|
graph_name: &EncodedTerm,
|
|
) -> ChainedDecodingQuadIterator {
|
|
ChainedDecodingQuadIterator::new(if graph_name.is_default_graph() {
|
|
self.dosp_quads(&encode_term(object))
|
|
} else {
|
|
self.gosp_quads(&encode_term_pair(graph_name, object))
|
|
})
|
|
}
|
|
|
|
pub fn named_graphs(&self) -> DecodingGraphIterator {
|
|
DecodingGraphIterator {
|
|
iter: self.reader.iter(&self.storage.graphs_cf).unwrap(), //TODO: propagate error?
|
|
}
|
|
}
|
|
|
|
pub fn contains_named_graph(&self, graph_name: &EncodedTerm) -> Result<bool, StorageError> {
|
|
self.reader
|
|
.contains_key(&self.storage.graphs_cf, &encode_term(graph_name))
|
|
}
|
|
|
|
fn spog_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
|
|
self.inner_quads(&self.storage.spog_cf, prefix, QuadEncoding::Spog)
|
|
}
|
|
|
|
fn posg_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
|
|
self.inner_quads(&self.storage.posg_cf, prefix, QuadEncoding::Posg)
|
|
}
|
|
|
|
fn ospg_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
|
|
self.inner_quads(&self.storage.ospg_cf, prefix, QuadEncoding::Ospg)
|
|
}
|
|
|
|
fn gspo_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
|
|
self.inner_quads(&self.storage.gspo_cf, prefix, QuadEncoding::Gspo)
|
|
}
|
|
|
|
fn gpos_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
|
|
self.inner_quads(&self.storage.gpos_cf, prefix, QuadEncoding::Gpos)
|
|
}
|
|
|
|
fn gosp_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
|
|
self.inner_quads(&self.storage.gosp_cf, prefix, QuadEncoding::Gosp)
|
|
}
|
|
|
|
fn dspo_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
|
|
self.inner_quads(&self.storage.dspo_cf, prefix, QuadEncoding::Dspo)
|
|
}
|
|
|
|
fn dpos_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
|
|
self.inner_quads(&self.storage.dpos_cf, prefix, QuadEncoding::Dpos)
|
|
}
|
|
|
|
fn dosp_quads(&self, prefix: &[u8]) -> DecodingQuadIterator {
|
|
self.inner_quads(&self.storage.dosp_cf, prefix, QuadEncoding::Dosp)
|
|
}
|
|
|
|
fn inner_quads(
|
|
&self,
|
|
column_family: &ColumnFamily,
|
|
prefix: &[u8],
|
|
encoding: QuadEncoding,
|
|
) -> DecodingQuadIterator {
|
|
DecodingQuadIterator {
|
|
iter: self.reader.scan_prefix(column_family, prefix).unwrap(), // TODO: propagate error?
|
|
encoding,
|
|
}
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
|
|
Ok(self
|
|
.storage
|
|
.db
|
|
.get(&self.storage.id2str_cf, &key.to_be_bytes())?
|
|
.map(|v| String::from_utf8(v.into()))
|
|
.transpose()
|
|
.map_err(CorruptionError::new)?)
|
|
}
|
|
|
|
#[cfg(target_family = "wasm")]
|
|
pub fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
|
|
Ok(self
|
|
.reader
|
|
.get(&self.storage.id2str_cf, &key.to_be_bytes())?
|
|
.map(String::from_utf8)
|
|
.transpose()
|
|
.map_err(CorruptionError::new)?)
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
|
|
self.storage
|
|
.db
|
|
.contains_key(&self.storage.id2str_cf, &key.to_be_bytes())
|
|
}
|
|
|
|
#[cfg(target_family = "wasm")]
|
|
pub fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
|
|
self.reader
|
|
.contains_key(&self.storage.id2str_cf, &key.to_be_bytes())
|
|
}
|
|
|
|
/// Validates that all the storage invariants held in the data
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub fn validate(&self) -> Result<(), StorageError> {
|
|
// triples
|
|
let dspo_size = self.dspo_quads(&[]).count();
|
|
if dspo_size != self.dpos_quads(&[]).count() || dspo_size != self.dosp_quads(&[]).count() {
|
|
return Err(CorruptionError::new(
|
|
"Not the same number of triples in dspo, dpos and dosp",
|
|
)
|
|
.into());
|
|
}
|
|
for spo in self.dspo_quads(&[]) {
|
|
let spo = spo?;
|
|
self.decode_quad(&spo)?; // We ensure that the quad is readable
|
|
if !self.storage.db.contains_key(
|
|
&self.storage.dpos_cf,
|
|
&encode_term_triple(&spo.predicate, &spo.object, &spo.subject),
|
|
)? {
|
|
return Err(CorruptionError::new("Quad in dspo and not in dpos").into());
|
|
}
|
|
if !self.storage.db.contains_key(
|
|
&self.storage.dosp_cf,
|
|
&encode_term_triple(&spo.object, &spo.subject, &spo.predicate),
|
|
)? {
|
|
return Err(CorruptionError::new("Quad in dspo and not in dpos").into());
|
|
}
|
|
}
|
|
|
|
// quads
|
|
let gspo_size = self.gspo_quads(&[]).count();
|
|
if gspo_size != self.gpos_quads(&[]).count()
|
|
|| gspo_size != self.gosp_quads(&[]).count()
|
|
|| gspo_size != self.spog_quads(&[]).count()
|
|
|| gspo_size != self.posg_quads(&[]).count()
|
|
|| gspo_size != self.ospg_quads(&[]).count()
|
|
{
|
|
return Err(CorruptionError::new(
|
|
"Not the same number of triples in dspo, dpos and dosp",
|
|
)
|
|
.into());
|
|
}
|
|
for gspo in self.gspo_quads(&[]) {
|
|
let gspo = gspo?;
|
|
self.decode_quad(&gspo)?; // We ensure that the quad is readable
|
|
if !self.storage.db.contains_key(
|
|
&self.storage.gpos_cf,
|
|
&encode_term_quad(
|
|
&gspo.graph_name,
|
|
&gspo.predicate,
|
|
&gspo.object,
|
|
&gspo.subject,
|
|
),
|
|
)? {
|
|
return Err(CorruptionError::new("Quad in gspo and not in gpos").into());
|
|
}
|
|
if !self.storage.db.contains_key(
|
|
&self.storage.gosp_cf,
|
|
&encode_term_quad(
|
|
&gspo.graph_name,
|
|
&gspo.object,
|
|
&gspo.subject,
|
|
&gspo.predicate,
|
|
),
|
|
)? {
|
|
return Err(CorruptionError::new("Quad in gspo and not in gosp").into());
|
|
}
|
|
if !self.storage.db.contains_key(
|
|
&self.storage.spog_cf,
|
|
&encode_term_quad(
|
|
&gspo.subject,
|
|
&gspo.predicate,
|
|
&gspo.object,
|
|
&gspo.graph_name,
|
|
),
|
|
)? {
|
|
return Err(CorruptionError::new("Quad in gspo and not in spog").into());
|
|
}
|
|
if !self.storage.db.contains_key(
|
|
&self.storage.posg_cf,
|
|
&encode_term_quad(
|
|
&gspo.predicate,
|
|
&gspo.object,
|
|
&gspo.subject,
|
|
&gspo.graph_name,
|
|
),
|
|
)? {
|
|
return Err(CorruptionError::new("Quad in gspo and not in posg").into());
|
|
}
|
|
if !self.storage.db.contains_key(
|
|
&self.storage.ospg_cf,
|
|
&encode_term_quad(
|
|
&gspo.object,
|
|
&gspo.subject,
|
|
&gspo.predicate,
|
|
&gspo.graph_name,
|
|
),
|
|
)? {
|
|
return Err(CorruptionError::new("Quad in gspo and not in ospg").into());
|
|
}
|
|
if !self
|
|
.storage
|
|
.db
|
|
.contains_key(&self.storage.graphs_cf, &encode_term(&gspo.graph_name))?
|
|
{
|
|
return Err(
|
|
CorruptionError::new("Quad graph name in gspo and not in graphs").into(),
|
|
);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Validates that all the storage invariants held in the data
|
|
#[cfg(target_family = "wasm")]
|
|
#[allow(clippy::unused_self, clippy::unnecessary_wraps)]
|
|
pub fn validate(&self) -> Result<(), StorageError> {
|
|
Ok(()) //TODO
|
|
}
|
|
}
|
|
|
|
pub struct ChainedDecodingQuadIterator {
|
|
first: DecodingQuadIterator,
|
|
second: Option<DecodingQuadIterator>,
|
|
}
|
|
|
|
impl ChainedDecodingQuadIterator {
|
|
fn new(first: DecodingQuadIterator) -> Self {
|
|
Self {
|
|
first,
|
|
second: None,
|
|
}
|
|
}
|
|
|
|
fn pair(first: DecodingQuadIterator, second: DecodingQuadIterator) -> Self {
|
|
Self {
|
|
first,
|
|
second: Some(second),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Iterator for ChainedDecodingQuadIterator {
|
|
type Item = Result<EncodedQuad, StorageError>;
|
|
|
|
fn next(&mut self) -> Option<Result<EncodedQuad, StorageError>> {
|
|
if let Some(result) = self.first.next() {
|
|
Some(result)
|
|
} else if let Some(second) = self.second.as_mut() {
|
|
second.next()
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct DecodingQuadIterator {
|
|
iter: Iter,
|
|
encoding: QuadEncoding,
|
|
}
|
|
|
|
impl Iterator for DecodingQuadIterator {
|
|
type Item = Result<EncodedQuad, StorageError>;
|
|
|
|
fn next(&mut self) -> Option<Result<EncodedQuad, StorageError>> {
|
|
if let Err(e) = self.iter.status() {
|
|
return Some(Err(e));
|
|
}
|
|
let term = self.encoding.decode(self.iter.key()?);
|
|
self.iter.next();
|
|
Some(term)
|
|
}
|
|
}
|
|
|
|
pub struct DecodingGraphIterator {
|
|
iter: Iter,
|
|
}
|
|
|
|
impl Iterator for DecodingGraphIterator {
|
|
type Item = Result<EncodedTerm, StorageError>;
|
|
|
|
fn next(&mut self) -> Option<Result<EncodedTerm, StorageError>> {
|
|
if let Err(e) = self.iter.status() {
|
|
return Some(Err(e));
|
|
}
|
|
let term = decode_term(self.iter.key()?);
|
|
self.iter.next();
|
|
Some(term)
|
|
}
|
|
}
|
|
|
|
impl StrLookup for StorageReader {
|
|
fn get_str(&self, key: &StrHash) -> Result<Option<String>, StorageError> {
|
|
self.get_str(key)
|
|
}
|
|
|
|
fn contains_str(&self, key: &StrHash) -> Result<bool, StorageError> {
|
|
self.contains_str(key)
|
|
}
|
|
}
|
|
|
|
pub struct StorageWriter<'a> {
|
|
buffer: Vec<u8>,
|
|
transaction: Transaction<'a>,
|
|
storage: &'a Storage,
|
|
}
|
|
|
|
impl<'a> StorageWriter<'a> {
|
|
pub fn reader(&self) -> StorageReader {
|
|
StorageReader {
|
|
reader: self.transaction.reader(),
|
|
storage: self.storage.clone(),
|
|
}
|
|
}
|
|
|
|
pub fn insert(&mut self, quad: QuadRef<'_>) -> Result<bool, StorageError> {
|
|
let encoded = quad.into();
|
|
self.buffer.clear();
|
|
let result = if quad.graph_name.is_default_graph() {
|
|
write_spo_quad(&mut self.buffer, &encoded);
|
|
if self
|
|
.transaction
|
|
.contains_key_for_update(&self.storage.dspo_cf, &self.buffer)?
|
|
{
|
|
false
|
|
} else {
|
|
self.transaction
|
|
.insert_empty(&self.storage.dspo_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_pos_quad(&mut self.buffer, &encoded);
|
|
self.transaction
|
|
.insert_empty(&self.storage.dpos_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_osp_quad(&mut self.buffer, &encoded);
|
|
self.transaction
|
|
.insert_empty(&self.storage.dosp_cf, &self.buffer)?;
|
|
|
|
self.insert_term(quad.subject.into(), &encoded.subject)?;
|
|
self.insert_term(quad.predicate.into(), &encoded.predicate)?;
|
|
self.insert_term(quad.object, &encoded.object)?;
|
|
true
|
|
}
|
|
} else {
|
|
write_spog_quad(&mut self.buffer, &encoded);
|
|
if self
|
|
.transaction
|
|
.contains_key_for_update(&self.storage.spog_cf, &self.buffer)?
|
|
{
|
|
false
|
|
} else {
|
|
self.transaction
|
|
.insert_empty(&self.storage.spog_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_posg_quad(&mut self.buffer, &encoded);
|
|
self.transaction
|
|
.insert_empty(&self.storage.posg_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_ospg_quad(&mut self.buffer, &encoded);
|
|
self.transaction
|
|
.insert_empty(&self.storage.ospg_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_gspo_quad(&mut self.buffer, &encoded);
|
|
self.transaction
|
|
.insert_empty(&self.storage.gspo_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_gpos_quad(&mut self.buffer, &encoded);
|
|
self.transaction
|
|
.insert_empty(&self.storage.gpos_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_gosp_quad(&mut self.buffer, &encoded);
|
|
self.transaction
|
|
.insert_empty(&self.storage.gosp_cf, &self.buffer)?;
|
|
|
|
self.insert_term(quad.subject.into(), &encoded.subject)?;
|
|
self.insert_term(quad.predicate.into(), &encoded.predicate)?;
|
|
self.insert_term(quad.object, &encoded.object)?;
|
|
|
|
self.buffer.clear();
|
|
write_term(&mut self.buffer, &encoded.graph_name);
|
|
if !self
|
|
.transaction
|
|
.contains_key_for_update(&self.storage.graphs_cf, &self.buffer)?
|
|
{
|
|
self.transaction
|
|
.insert_empty(&self.storage.graphs_cf, &self.buffer)?;
|
|
self.insert_graph_name(quad.graph_name, &encoded.graph_name)?;
|
|
}
|
|
true
|
|
}
|
|
};
|
|
Ok(result)
|
|
}
|
|
|
|
pub fn insert_named_graph(
|
|
&mut self,
|
|
graph_name: NamedOrBlankNodeRef<'_>,
|
|
) -> Result<bool, StorageError> {
|
|
let encoded_graph_name = graph_name.into();
|
|
|
|
self.buffer.clear();
|
|
write_term(&mut self.buffer, &encoded_graph_name);
|
|
let result = if self
|
|
.transaction
|
|
.contains_key_for_update(&self.storage.graphs_cf, &self.buffer)?
|
|
{
|
|
false
|
|
} else {
|
|
self.transaction
|
|
.insert_empty(&self.storage.graphs_cf, &self.buffer)?;
|
|
self.insert_term(graph_name.into(), &encoded_graph_name)?;
|
|
true
|
|
};
|
|
Ok(result)
|
|
}
|
|
|
|
fn insert_term(
|
|
&mut self,
|
|
term: TermRef<'_>,
|
|
encoded: &EncodedTerm,
|
|
) -> Result<(), StorageError> {
|
|
insert_term(term, encoded, &mut |key, value| self.insert_str(key, value))
|
|
}
|
|
|
|
fn insert_graph_name(
|
|
&mut self,
|
|
graph_name: GraphNameRef<'_>,
|
|
encoded: &EncodedTerm,
|
|
) -> Result<(), StorageError> {
|
|
match graph_name {
|
|
GraphNameRef::NamedNode(graph_name) => self.insert_term(graph_name.into(), encoded),
|
|
GraphNameRef::BlankNode(graph_name) => self.insert_term(graph_name.into(), encoded),
|
|
GraphNameRef::DefaultGraph => Ok(()),
|
|
}
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> {
|
|
if self
|
|
.storage
|
|
.db
|
|
.contains_key(&self.storage.id2str_cf, &key.to_be_bytes())?
|
|
{
|
|
return Ok(());
|
|
}
|
|
self.storage.db.insert(
|
|
&self.storage.id2str_cf,
|
|
&key.to_be_bytes(),
|
|
value.as_bytes(),
|
|
)
|
|
}
|
|
|
|
#[cfg(target_family = "wasm")]
|
|
fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> {
|
|
self.transaction.insert(
|
|
&self.storage.id2str_cf,
|
|
&key.to_be_bytes(),
|
|
value.as_bytes(),
|
|
)
|
|
}
|
|
|
|
pub fn remove(&mut self, quad: QuadRef<'_>) -> Result<bool, StorageError> {
|
|
self.remove_encoded(&quad.into())
|
|
}
|
|
|
|
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<bool, StorageError> {
|
|
self.buffer.clear();
|
|
let result = if quad.graph_name.is_default_graph() {
|
|
write_spo_quad(&mut self.buffer, quad);
|
|
|
|
if self
|
|
.transaction
|
|
.contains_key_for_update(&self.storage.dspo_cf, &self.buffer)?
|
|
{
|
|
self.transaction
|
|
.remove(&self.storage.dspo_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_pos_quad(&mut self.buffer, quad);
|
|
self.transaction
|
|
.remove(&self.storage.dpos_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_osp_quad(&mut self.buffer, quad);
|
|
self.transaction
|
|
.remove(&self.storage.dosp_cf, &self.buffer)?;
|
|
true
|
|
} else {
|
|
false
|
|
}
|
|
} else {
|
|
write_spog_quad(&mut self.buffer, quad);
|
|
|
|
if self
|
|
.transaction
|
|
.contains_key_for_update(&self.storage.spog_cf, &self.buffer)?
|
|
{
|
|
self.transaction
|
|
.remove(&self.storage.spog_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_posg_quad(&mut self.buffer, quad);
|
|
self.transaction
|
|
.remove(&self.storage.posg_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_ospg_quad(&mut self.buffer, quad);
|
|
self.transaction
|
|
.remove(&self.storage.ospg_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_gspo_quad(&mut self.buffer, quad);
|
|
self.transaction
|
|
.remove(&self.storage.gspo_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_gpos_quad(&mut self.buffer, quad);
|
|
self.transaction
|
|
.remove(&self.storage.gpos_cf, &self.buffer)?;
|
|
|
|
self.buffer.clear();
|
|
write_gosp_quad(&mut self.buffer, quad);
|
|
self.transaction
|
|
.remove(&self.storage.gosp_cf, &self.buffer)?;
|
|
true
|
|
} else {
|
|
false
|
|
}
|
|
};
|
|
Ok(result)
|
|
}
|
|
|
|
pub fn clear_graph(&mut self, graph_name: GraphNameRef<'_>) -> Result<(), StorageError> {
|
|
if graph_name.is_default_graph() {
|
|
for quad in self.reader().quads_for_graph(&EncodedTerm::DefaultGraph) {
|
|
self.remove_encoded(&quad?)?;
|
|
}
|
|
} else {
|
|
self.buffer.clear();
|
|
write_term(&mut self.buffer, &graph_name.into());
|
|
if self
|
|
.transaction
|
|
.contains_key_for_update(&self.storage.graphs_cf, &self.buffer)?
|
|
{
|
|
// The condition is useful to lock the graph itself and ensure no quad is inserted at the same time
|
|
for quad in self.reader().quads_for_graph(&graph_name.into()) {
|
|
self.remove_encoded(&quad?)?;
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub fn clear_all_named_graphs(&mut self) -> Result<(), StorageError> {
|
|
for quad in self.reader().quads_in_named_graph() {
|
|
self.remove_encoded(&quad?)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub fn clear_all_graphs(&mut self) -> Result<(), StorageError> {
|
|
for quad in self.reader().quads() {
|
|
self.remove_encoded(&quad?)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub fn remove_named_graph(
|
|
&mut self,
|
|
graph_name: NamedOrBlankNodeRef<'_>,
|
|
) -> Result<bool, StorageError> {
|
|
self.remove_encoded_named_graph(&graph_name.into())
|
|
}
|
|
|
|
fn remove_encoded_named_graph(
|
|
&mut self,
|
|
graph_name: &EncodedTerm,
|
|
) -> Result<bool, StorageError> {
|
|
self.buffer.clear();
|
|
write_term(&mut self.buffer, graph_name);
|
|
let result = if self
|
|
.transaction
|
|
.contains_key_for_update(&self.storage.graphs_cf, &self.buffer)?
|
|
{
|
|
// The condition is done ASAP to lock the graph itself
|
|
for quad in self.reader().quads_for_graph(graph_name) {
|
|
self.remove_encoded(&quad?)?;
|
|
}
|
|
self.buffer.clear();
|
|
write_term(&mut self.buffer, graph_name);
|
|
self.transaction
|
|
.remove(&self.storage.graphs_cf, &self.buffer)?;
|
|
true
|
|
} else {
|
|
false
|
|
};
|
|
Ok(result)
|
|
}
|
|
|
|
pub fn remove_all_named_graphs(&mut self) -> Result<(), StorageError> {
|
|
for graph_name in self.reader().named_graphs() {
|
|
self.remove_encoded_named_graph(&graph_name?)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub fn clear(&mut self) -> Result<(), StorageError> {
|
|
for graph_name in self.reader().named_graphs() {
|
|
self.remove_encoded_named_graph(&graph_name?)?;
|
|
}
|
|
for quad in self.reader().quads() {
|
|
self.remove_encoded(&quad?)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
pub struct StorageBulkLoader {
|
|
storage: Storage,
|
|
hooks: Vec<Box<dyn Fn(u64)>>,
|
|
num_threads: Option<usize>,
|
|
max_memory_size: Option<usize>,
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
impl StorageBulkLoader {
|
|
pub fn new(storage: Storage) -> Self {
|
|
Self {
|
|
storage,
|
|
hooks: Vec::new(),
|
|
num_threads: None,
|
|
max_memory_size: None,
|
|
}
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn set_num_threads(mut self, num_threads: usize) -> Self {
|
|
self.num_threads = Some(num_threads);
|
|
self
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn set_max_memory_size_in_megabytes(mut self, max_memory_size: usize) -> Self {
|
|
self.max_memory_size = Some(max_memory_size);
|
|
self
|
|
}
|
|
|
|
#[must_use]
|
|
pub fn on_progress(mut self, callback: impl Fn(u64) + 'static) -> Self {
|
|
self.hooks.push(Box::new(callback));
|
|
self
|
|
}
|
|
|
|
#[allow(clippy::trait_duplication_in_bounds)]
|
|
pub fn load<EI, EO: From<StorageError> + From<EI>>(
|
|
&self,
|
|
quads: impl IntoIterator<Item = Result<Quad, EI>>,
|
|
) -> Result<(), EO> {
|
|
let num_threads = self.num_threads.unwrap_or(2);
|
|
if num_threads < 2 {
|
|
return Err(
|
|
StorageError::Other("The bulk loader needs at least 2 threads".into()).into(),
|
|
);
|
|
}
|
|
let batch_size = if let Some(max_memory_size) = self.max_memory_size {
|
|
max_memory_size * 1000 / num_threads
|
|
} else {
|
|
DEFAULT_BULK_LOAD_BATCH_SIZE
|
|
};
|
|
if batch_size < 10_000 {
|
|
return Err(StorageError::Other(
|
|
"The bulk loader memory bound is too low. It needs at least 100MB".into(),
|
|
)
|
|
.into());
|
|
}
|
|
let done_counter = AtomicU64::new(0);
|
|
let mut done_and_displayed_counter = 0;
|
|
thread::scope(|thread_scope| {
|
|
let mut threads = VecDeque::with_capacity(num_threads - 1);
|
|
let mut buffer = Vec::with_capacity(batch_size);
|
|
for quad in quads {
|
|
let quad = quad?;
|
|
buffer.push(quad);
|
|
if buffer.len() >= batch_size {
|
|
self.spawn_load_thread(
|
|
&mut buffer,
|
|
&mut threads,
|
|
thread_scope,
|
|
&done_counter,
|
|
&mut done_and_displayed_counter,
|
|
num_threads,
|
|
batch_size,
|
|
)?;
|
|
}
|
|
}
|
|
self.spawn_load_thread(
|
|
&mut buffer,
|
|
&mut threads,
|
|
thread_scope,
|
|
&done_counter,
|
|
&mut done_and_displayed_counter,
|
|
num_threads,
|
|
batch_size,
|
|
)?;
|
|
for thread in threads {
|
|
map_thread_result(thread.join()).map_err(StorageError::Io)??;
|
|
self.on_possible_progress(&done_counter, &mut done_and_displayed_counter);
|
|
}
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
fn spawn_load_thread<'scope>(
|
|
&'scope self,
|
|
buffer: &mut Vec<Quad>,
|
|
threads: &mut VecDeque<thread::ScopedJoinHandle<'scope, Result<(), StorageError>>>,
|
|
thread_scope: &'scope thread::Scope<'scope, '_>,
|
|
done_counter: &'scope AtomicU64,
|
|
done_and_displayed_counter: &mut u64,
|
|
num_threads: usize,
|
|
batch_size: usize,
|
|
) -> Result<(), StorageError> {
|
|
self.on_possible_progress(done_counter, done_and_displayed_counter);
|
|
// We avoid to have too many threads
|
|
if threads.len() >= num_threads {
|
|
if let Some(thread) = threads.pop_front() {
|
|
map_thread_result(thread.join()).map_err(StorageError::Io)??;
|
|
self.on_possible_progress(done_counter, done_and_displayed_counter);
|
|
}
|
|
}
|
|
let mut buffer_to_load = Vec::with_capacity(batch_size);
|
|
swap(buffer, &mut buffer_to_load);
|
|
let storage = &self.storage;
|
|
threads.push_back(thread_scope.spawn(move || {
|
|
FileBulkLoader::new(storage, batch_size).load(buffer_to_load, done_counter)
|
|
}));
|
|
Ok(())
|
|
}
|
|
|
|
fn on_possible_progress(&self, done: &AtomicU64, done_and_displayed: &mut u64) {
|
|
let new_counter = done.load(Ordering::Relaxed);
|
|
let display_step = u64::try_from(DEFAULT_BULK_LOAD_BATCH_SIZE).unwrap();
|
|
if new_counter / display_step > *done_and_displayed / display_step {
|
|
for hook in &self.hooks {
|
|
hook(new_counter);
|
|
}
|
|
}
|
|
*done_and_displayed = new_counter;
|
|
}
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
struct FileBulkLoader<'a> {
|
|
storage: &'a Storage,
|
|
id2str: HashMap<StrHash, Box<str>>,
|
|
quads: HashSet<EncodedQuad>,
|
|
triples: HashSet<EncodedQuad>,
|
|
graphs: HashSet<EncodedTerm>,
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
impl<'a> FileBulkLoader<'a> {
|
|
fn new(storage: &'a Storage, batch_size: usize) -> Self {
|
|
Self {
|
|
storage,
|
|
id2str: HashMap::with_capacity(3 * batch_size),
|
|
quads: HashSet::with_capacity(batch_size),
|
|
triples: HashSet::with_capacity(batch_size),
|
|
graphs: HashSet::default(),
|
|
}
|
|
}
|
|
|
|
fn load(&mut self, quads: Vec<Quad>, counter: &AtomicU64) -> Result<(), StorageError> {
|
|
self.encode(quads)?;
|
|
let size = self.triples.len() + self.quads.len();
|
|
self.save()?;
|
|
counter.fetch_add(size.try_into().unwrap_or(u64::MAX), Ordering::Relaxed);
|
|
Ok(())
|
|
}
|
|
|
|
fn encode(&mut self, quads: Vec<Quad>) -> Result<(), StorageError> {
|
|
for quad in quads {
|
|
let encoded = EncodedQuad::from(quad.as_ref());
|
|
if quad.graph_name.is_default_graph() {
|
|
if self.triples.insert(encoded.clone()) {
|
|
self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?;
|
|
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?;
|
|
self.insert_term(quad.object.as_ref(), &encoded.object)?;
|
|
}
|
|
} else if self.quads.insert(encoded.clone()) {
|
|
self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?;
|
|
self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?;
|
|
self.insert_term(quad.object.as_ref(), &encoded.object)?;
|
|
|
|
if self.graphs.insert(encoded.graph_name.clone()) {
|
|
self.insert_term(
|
|
match quad.graph_name.as_ref() {
|
|
GraphNameRef::NamedNode(n) => n.into(),
|
|
GraphNameRef::BlankNode(n) => n.into(),
|
|
GraphNameRef::DefaultGraph => {
|
|
return Err(CorruptionError::new(
|
|
"Default graph this not the default graph",
|
|
)
|
|
.into())
|
|
}
|
|
},
|
|
&encoded.graph_name,
|
|
)?;
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
fn save(&mut self) -> Result<(), StorageError> {
|
|
let mut to_load = Vec::new();
|
|
|
|
// id2str
|
|
if !self.id2str.is_empty() {
|
|
let mut id2str = take(&mut self.id2str)
|
|
.into_iter()
|
|
.map(|(k, v)| (k.to_be_bytes(), v))
|
|
.collect::<Vec<_>>();
|
|
id2str.sort_unstable();
|
|
let mut id2str_sst = self.storage.db.new_sst_file()?;
|
|
for (k, v) in id2str {
|
|
id2str_sst.insert(&k, v.as_bytes())?;
|
|
}
|
|
to_load.push((&self.storage.id2str_cf, id2str_sst.finish()?));
|
|
}
|
|
|
|
if !self.triples.is_empty() {
|
|
to_load.push((
|
|
&self.storage.dspo_cf,
|
|
self.build_sst_for_keys(
|
|
self.triples.iter().map(|quad| {
|
|
encode_term_triple(&quad.subject, &quad.predicate, &quad.object)
|
|
}),
|
|
)?,
|
|
));
|
|
to_load.push((
|
|
&self.storage.dpos_cf,
|
|
self.build_sst_for_keys(
|
|
self.triples.iter().map(|quad| {
|
|
encode_term_triple(&quad.predicate, &quad.object, &quad.subject)
|
|
}),
|
|
)?,
|
|
));
|
|
to_load.push((
|
|
&self.storage.dosp_cf,
|
|
self.build_sst_for_keys(
|
|
self.triples.iter().map(|quad| {
|
|
encode_term_triple(&quad.object, &quad.subject, &quad.predicate)
|
|
}),
|
|
)?,
|
|
));
|
|
self.triples.clear();
|
|
}
|
|
|
|
if !self.quads.is_empty() {
|
|
to_load.push((
|
|
&self.storage.graphs_cf,
|
|
self.build_sst_for_keys(self.graphs.iter().map(encode_term))?,
|
|
));
|
|
self.graphs.clear();
|
|
|
|
to_load.push((
|
|
&self.storage.gspo_cf,
|
|
self.build_sst_for_keys(self.quads.iter().map(|quad| {
|
|
encode_term_quad(
|
|
&quad.graph_name,
|
|
&quad.subject,
|
|
&quad.predicate,
|
|
&quad.object,
|
|
)
|
|
}))?,
|
|
));
|
|
to_load.push((
|
|
&self.storage.gpos_cf,
|
|
self.build_sst_for_keys(self.quads.iter().map(|quad| {
|
|
encode_term_quad(
|
|
&quad.graph_name,
|
|
&quad.predicate,
|
|
&quad.object,
|
|
&quad.subject,
|
|
)
|
|
}))?,
|
|
));
|
|
to_load.push((
|
|
&self.storage.gosp_cf,
|
|
self.build_sst_for_keys(self.quads.iter().map(|quad| {
|
|
encode_term_quad(
|
|
&quad.graph_name,
|
|
&quad.object,
|
|
&quad.subject,
|
|
&quad.predicate,
|
|
)
|
|
}))?,
|
|
));
|
|
to_load.push((
|
|
&self.storage.spog_cf,
|
|
self.build_sst_for_keys(self.quads.iter().map(|quad| {
|
|
encode_term_quad(
|
|
&quad.subject,
|
|
&quad.predicate,
|
|
&quad.object,
|
|
&quad.graph_name,
|
|
)
|
|
}))?,
|
|
));
|
|
to_load.push((
|
|
&self.storage.posg_cf,
|
|
self.build_sst_for_keys(self.quads.iter().map(|quad| {
|
|
encode_term_quad(
|
|
&quad.predicate,
|
|
&quad.object,
|
|
&quad.subject,
|
|
&quad.graph_name,
|
|
)
|
|
}))?,
|
|
));
|
|
to_load.push((
|
|
&self.storage.ospg_cf,
|
|
self.build_sst_for_keys(self.quads.iter().map(|quad| {
|
|
encode_term_quad(
|
|
&quad.object,
|
|
&quad.subject,
|
|
&quad.predicate,
|
|
&quad.graph_name,
|
|
)
|
|
}))?,
|
|
));
|
|
self.quads.clear();
|
|
}
|
|
|
|
self.storage.db.insert_stt_files(&to_load)
|
|
}
|
|
|
|
fn insert_term(
|
|
&mut self,
|
|
term: TermRef<'_>,
|
|
encoded: &EncodedTerm,
|
|
) -> Result<(), StorageError> {
|
|
insert_term(term, encoded, &mut |key, value| {
|
|
self.id2str.entry(*key).or_insert_with(|| value.into());
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
fn build_sst_for_keys(
|
|
&self,
|
|
values: impl Iterator<Item = Vec<u8>>,
|
|
) -> Result<PathBuf, StorageError> {
|
|
let mut values = values.collect::<Vec<_>>();
|
|
values.sort_unstable();
|
|
let mut sst = self.storage.db.new_sst_file()?;
|
|
for value in values {
|
|
sst.insert_empty(&value)?;
|
|
}
|
|
sst.finish()
|
|
}
|
|
}
|
|
|
|
#[cfg(not(target_family = "wasm"))]
|
|
fn map_thread_result<R>(result: thread::Result<R>) -> io::Result<R> {
|
|
result.map_err(|e| {
|
|
io::Error::new(
|
|
io::ErrorKind::Other,
|
|
if let Ok(e) = e.downcast::<&dyn std::fmt::Display>() {
|
|
format!("A loader processed crashed with {e}")
|
|
} else {
|
|
"A loader processed crashed with and unknown error".into()
|
|
},
|
|
)
|
|
})
|
|
}
|
|
|