Add rough outline of node function

pull/825/head
Peter Heringer 1 year ago
parent 44396ca1b5
commit d8c4669af6
  1. 319
      lib/src/storage/mod.rs

@ -1,7 +1,7 @@
#![allow(clippy::same_name_method)]
#[cfg(not(target_family = "wasm"))]
use crate::model::Quad;
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef, TermRef};
use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef, Term, TermRef};
use crate::storage::backend::{Reader, Transaction};
#[cfg(not(target_family = "wasm"))]
use crate::storage::binary_encoder::LATEST_STORAGE_VERSION;
@ -16,7 +16,10 @@ pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, S
use crate::storage::numeric_encoder::Decoder;
use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup};
use backend::{ColumnFamily, ColumnFamilyDefinition, Db, Iter};
use handlegraph::packedgraph::PackedGraph;
use gfa::parser::GFAParser;
use handlegraph::{conversion::from_gfa, packedgraph::PackedGraph};
use oxrdf::NamedNode;
#[cfg(not(target_family = "wasm"))]
use std::collections::VecDeque;
#[cfg(not(target_family = "wasm"))]
@ -61,25 +64,10 @@ const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000;
#[derive(Clone)]
pub struct Storage {
graph: PackedGraph,
// db: Db,
// #[cfg(not(target_family = "wasm"))]
// default_cf: ColumnFamily,
// id2str_cf: ColumnFamily,
// spog_cf: ColumnFamily,
// posg_cf: ColumnFamily,
// ospg_cf: ColumnFamily,
// gspo_cf: ColumnFamily,
// gpos_cf: ColumnFamily,
// gosp_cf: ColumnFamily,
// dspo_cf: ColumnFamily,
// dpos_cf: ColumnFamily,
// dosp_cf: ColumnFamily,
// graphs_cf: ColumnFamily,
}
impl Storage {
pub fn new() -> Result<Self, StorageError> {
// Self::setup(Db::new(Self::column_families())?)
Ok(Self {
graph: PackedGraph::new(),
})
@ -87,22 +75,22 @@ impl Storage {
#[cfg(not(target_family = "wasm"))]
pub fn open(path: &Path) -> Result<Self, StorageError> {
// Self::setup(Db::open_read_write(Some(path), Self::column_families())?)
Ok(Self {
graph: PackedGraph::new(),
})
let gfa_parser = GFAParser::new();
let gfa = gfa_parser
.parse_file(path)
.map_err(|err| StorageError::Other(Box::new(err)))?;
let graph = from_gfa::<PackedGraph, ()>(&gfa);
Ok(Self { graph })
}
#[cfg(not(target_family = "wasm"))]
pub fn open_secondary(primary_path: &Path) -> Result<Self, StorageError> {
// Self::setup(Db::open_secondary(
// primary_path,
// None,
// Self::column_families(),
// )?)
Ok(Self {
graph: PackedGraph::new(),
})
let gfa_parser = GFAParser::new();
let gfa = gfa_parser
.parse_file(primary_path)
.map_err(|err| StorageError::Other(Box::new(err)))?;
let graph = from_gfa::<PackedGraph, ()>(&gfa);
Ok(Self { graph })
}
#[cfg(not(target_family = "wasm"))]
@ -110,177 +98,24 @@ impl Storage {
primary_path: &Path,
secondary_path: &Path,
) -> Result<Self, StorageError> {
// Self::setup(Db::open_secondary(
// primary_path,
// Some(secondary_path),
// Self::column_families(),
// )?)
Ok(Self {
graph: PackedGraph::new(),
})
let gfa_parser = GFAParser::new();
let gfa = gfa_parser
.parse_file(primary_path)
.map_err(|err| StorageError::Other(Box::new(err)))?;
let graph = from_gfa::<PackedGraph, ()>(&gfa);
Ok(Self { graph })
}
#[cfg(not(target_family = "wasm"))]
pub fn open_read_only(path: &Path) -> Result<Self, StorageError> {
// Self::setup(Db::open_read_only(path, Self::column_families())?)
Ok(Self {
graph: PackedGraph::new(),
})
let gfa_parser = GFAParser::new();
let gfa = gfa_parser
.parse_file(path)
.map_err(|err| StorageError::Other(Box::new(err)))?;
let graph = from_gfa::<PackedGraph, ()>(&gfa);
Ok(Self { graph })
}
// fn column_families() -> Vec<ColumnFamilyDefinition> {
// vec![
// ColumnFamilyDefinition {
// name: ID2STR_CF,
// use_iter: false,
// min_prefix_size: 0,
// unordered_writes: true,
// },
// ColumnFamilyDefinition {
// name: SPOG_CF,
// use_iter: true,
// min_prefix_size: 17, // named or blank node start
// unordered_writes: false,
// },
// ColumnFamilyDefinition {
// name: POSG_CF,
// use_iter: true,
// min_prefix_size: 17, // named node start
// unordered_writes: false,
// },
// ColumnFamilyDefinition {
// name: OSPG_CF,
// use_iter: true,
// min_prefix_size: 0, // There are small literals...
// unordered_writes: false,
// },
// ColumnFamilyDefinition {
// name: GSPO_CF,
// use_iter: true,
// min_prefix_size: 17, // named or blank node start
// unordered_writes: false,
// },
// ColumnFamilyDefinition {
// name: GPOS_CF,
// use_iter: true,
// min_prefix_size: 17, // named or blank node start
// unordered_writes: false,
// },
// ColumnFamilyDefinition {
// name: GOSP_CF,
// use_iter: true,
// min_prefix_size: 17, // named or blank node start
// unordered_writes: false,
// },
// ColumnFamilyDefinition {
// name: DSPO_CF,
// use_iter: true,
// min_prefix_size: 17, // named or blank node start
// unordered_writes: false,
// },
// ColumnFamilyDefinition {
// name: DPOS_CF,
// use_iter: true,
// min_prefix_size: 17, // named or blank node start
// unordered_writes: false,
// },
// ColumnFamilyDefinition {
// name: DOSP_CF,
// use_iter: true,
// min_prefix_size: 0, // There are small literals...
// unordered_writes: false,
// },
// ColumnFamilyDefinition {
// name: GRAPHS_CF,
// use_iter: true,
// min_prefix_size: 17, // named or blank node start
// unordered_writes: false,
// },
// ]
// }
// #[allow(clippy::unnecessary_wraps, clippy::unwrap_in_result)]
// fn setup(db: Db) -> Result<Self, StorageError> {
// let this = Self {
// #[cfg(not(target_family = "wasm"))]
// default_cf: db.column_family(DEFAULT_CF).unwrap(),
// id2str_cf: db.column_family(ID2STR_CF).unwrap(),
// spog_cf: db.column_family(SPOG_CF).unwrap(),
// posg_cf: db.column_family(POSG_CF).unwrap(),
// ospg_cf: db.column_family(OSPG_CF).unwrap(),
// gspo_cf: db.column_family(GSPO_CF).unwrap(),
// gpos_cf: db.column_family(GPOS_CF).unwrap(),
// gosp_cf: db.column_family(GOSP_CF).unwrap(),
// dspo_cf: db.column_family(DSPO_CF).unwrap(),
// dpos_cf: db.column_family(DPOS_CF).unwrap(),
// dosp_cf: db.column_family(DOSP_CF).unwrap(),
// graphs_cf: db.column_family(GRAPHS_CF).unwrap(),
// db,
// };
// #[cfg(not(target_family = "wasm"))]
// this.migrate()?;
// Ok(this)
// }
// #[cfg(not(target_family = "wasm"))]
// fn migrate(&self) -> Result<(), StorageError> {
// let mut version = self.ensure_version()?;
// if version == 0 {
// // We migrate to v1
// let mut graph_names = HashSet::new();
// for quad in self.snapshot().quads() {
// let quad = quad?;
// if !quad.graph_name.is_default_graph() {
// graph_names.insert(quad.graph_name);
// }
// }
// let mut graph_names = graph_names
// .into_iter()
// .map(|g| encode_term(&g))
// .collect::<Vec<_>>();
// graph_names.sort_unstable();
// let mut stt_file = self.db.new_sst_file()?;
// for k in graph_names {
// stt_file.insert_empty(&k)?;
// }
// self.db
// .insert_stt_files(&[(&self.graphs_cf, stt_file.finish()?)])?;
// version = 1;
// self.update_version(version)?;
// }
// match version {
// _ if version < LATEST_STORAGE_VERSION => Err(CorruptionError::msg(format!(
// "The RocksDB database is using the outdated encoding version {version}. Automated migration is not supported, please dump the store dataset using a compatible Oxigraph version and load it again using the current version"
// )).into()),
// LATEST_STORAGE_VERSION => Ok(()),
// _ => Err(CorruptionError::msg(format!(
// "The RocksDB database is using the too recent version {version}. Upgrade to the latest Oxigraph version to load this database"
// )).into())
// }
// }
// #[cfg(not(target_family = "wasm"))]
// fn ensure_version(&self) -> Result<u64, StorageError> {
// Ok(
// if let Some(version) = self.db.get(&self.default_cf, b"oxversion")? {
// u64::from_be_bytes(version.as_ref().try_into().map_err(|e| {
// CorruptionError::new(format!("Error while parsing the version key: {e}"))
// })?)
// } else {
// self.update_version(LATEST_STORAGE_VERSION)?;
// LATEST_STORAGE_VERSION
// },
// )
// }
// #[cfg(not(target_family = "wasm"))]
// fn update_version(&self, version: u64) -> Result<(), StorageError> {
// self.db
// .insert(&self.default_cf, b"oxversion", &version.to_be_bytes())?;
// self.db.flush(&self.default_cf)
// }
pub fn snapshot(&self) -> StorageReader {
StorageReader {
// reader: self.db.snapshot(),
@ -306,39 +141,16 @@ impl Storage {
// }
#[cfg(not(target_family = "wasm"))]
pub fn flush(&self) -> Result<(), StorageError> {
// self.db.flush(&self.default_cf)?;
// self.db.flush(&self.gspo_cf)?;
// self.db.flush(&self.gpos_cf)?;
// self.db.flush(&self.gosp_cf)?;
// self.db.flush(&self.spog_cf)?;
// self.db.flush(&self.posg_cf)?;
// self.db.flush(&self.ospg_cf)?;
// self.db.flush(&self.dspo_cf)?;
// self.db.flush(&self.dpos_cf)?;
// self.db.flush(&self.dosp_cf)?;
// self.db.flush(&self.id2str_cf)
Ok(())
}
#[cfg(not(target_family = "wasm"))]
pub fn compact(&self) -> Result<(), StorageError> {
// self.db.compact(&self.default_cf)?;
// self.db.compact(&self.gspo_cf)?;
// self.db.compact(&self.gpos_cf)?;
// self.db.compact(&self.gosp_cf)?;
// self.db.compact(&self.spog_cf)?;
// self.db.compact(&self.posg_cf)?;
// self.db.compact(&self.ospg_cf)?;
// self.db.compact(&self.dspo_cf)?;
// self.db.compact(&self.dpos_cf)?;
// self.db.compact(&self.dosp_cf)?;
// self.db.compact(&self.id2str_cf)
Ok(())
}
#[cfg(not(target_family = "wasm"))]
pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> {
// self.db.backup(target_directory)
Ok(())
}
}
@ -379,6 +191,10 @@ impl StorageReader {
object: Option<&EncodedTerm>,
graph_name: Option<&EncodedTerm>,
) -> ChainedDecodingQuadIterator {
let sub = subject.map(|s| self.decode_term(s).ok()).flatten();
let pre = predicate.map(|s| self.decode_term(s).ok()).flatten();
let obj = object.map(|s| self.decode_term(s).ok()).flatten();
self.nodes(&sub, &pre, &obj);
return ChainedDecodingQuadIterator {
first: DecodingQuadIterator {
terms: Vec::new(),
@ -386,60 +202,21 @@ impl StorageReader {
},
second: None,
};
// match subject {
// Some(subject) => match predicate {
// Some(predicate) => match object {
// Some(object) => match graph_name {
// Some(graph_name) => self.quads_for_subject_predicate_object_graph(
// subject, predicate, object, graph_name,
// ),
// None => self.quads_for_subject_predicate_object(subject, predicate, object),
// },
// None => match graph_name {
// Some(graph_name) => {
// self.quads_for_subject_predicate_graph(subject, predicate, graph_name)
// }
// None => self.quads_for_subject_predicate(subject, predicate),
// },
// },
// None => match object {
// Some(object) => match graph_name {
// Some(graph_name) => {
// self.quads_for_subject_object_graph(subject, object, graph_name)
// }
// None => self.quads_for_subject_object(subject, object),
// },
// None => match graph_name {
// Some(graph_name) => self.quads_for_subject_graph(subject, graph_name),
// None => self.quads_for_subject(subject),
// },
// },
// },
// None => match predicate {
// Some(predicate) => match object {
// Some(object) => match graph_name {
// Some(graph_name) => {
// self.quads_for_predicate_object_graph(predicate, object, graph_name)
// }
// None => self.quads_for_predicate_object(predicate, object),
// },
// None => match graph_name {
// Some(graph_name) => self.quads_for_predicate_graph(predicate, graph_name),
// None => self.quads_for_predicate(predicate),
// },
// },
// None => match object {
// Some(object) => match graph_name {
// Some(graph_name) => self.quads_for_object_graph(object, graph_name),
// None => self.quads_for_object(object),
// },
// None => match graph_name {
// Some(graph_name) => self.quads_for_graph(graph_name),
// None => self.quads(),
// },
// },
// },
// }
}
fn nodes(&self, subject: &Option<Term>, predicate: &Option<Term>, object: &Option<Term>) {
match subject {
Some(sub) => {
let is_node_iri = self.is_node_iri_in_graph(sub);
}
None => {}
}
}
fn is_node_iri_in_graph(&self, term: &Term) -> bool {
let named_node: NamedNodeRef = term.into();
// term.is_named_node() && Ok(term.into::<NamedNode>())
true
}
pub fn quads(&self) -> ChainedDecodingQuadIterator {

Loading…
Cancel
Save