#![allow(clippy::same_name_method)] #[cfg(not(target_family = "wasm"))] use crate::model::Quad; use crate::model::{GraphNameRef, NamedOrBlankNodeRef, QuadRef, TermRef}; use crate::storage::backend::Transaction; use crate::storage::binary_encoder::QuadEncoding; pub use crate::storage::error::{CorruptionError, LoaderError, SerializerError, StorageError}; #[cfg(not(target_family = "wasm"))] use crate::storage::numeric_encoder::{insert_term, EncodedQuad, EncodedTerm, StrHash, StrLookup}; use gfa::parser::GFAParser; use handlegraph::{ conversion::from_gfa, packedgraph::PackedGraph, }; use std::str; #[cfg(not(target_family = "wasm"))] use std::collections::VecDeque; #[cfg(not(target_family = "wasm"))] use std::collections::{HashMap, HashSet}; #[cfg(not(target_family = "wasm"))] use std::mem::swap; #[cfg(not(target_family = "wasm"))] use std::path::Path; #[cfg(not(target_family = "wasm"))] use std::sync::atomic::{AtomicU64, Ordering}; #[cfg(not(target_family = "wasm"))] use std::sync::Arc; #[cfg(not(target_family = "wasm"))] use std::thread::spawn; #[cfg(not(target_family = "wasm"))] use std::thread::JoinHandle; use self::storage_generator::StorageGenerator; mod backend; mod binary_encoder; mod error; mod storage_generator; pub mod numeric_encoder; pub mod small_string; mod vg_vocab; #[cfg(not(target_family = "wasm"))] const DEFAULT_BULK_LOAD_BATCH_SIZE: usize = 1_000_000; /// Low level storage primitives #[derive(Clone)] pub struct Storage { graph: PackedGraph, base: String, } impl Storage { pub fn new() -> Result { Ok(Self { graph: PackedGraph::new(), base: "https://example.org".to_owned(), }) } pub fn from_str(gfa: &str) -> Result { let gfa_parser = GFAParser::new(); let gfa = gfa_parser .parse_lines(gfa.lines().map(|s| s.as_bytes())) .map_err(|err| StorageError::Other(Box::new(err)))?; let graph = from_gfa::(&gfa); Ok(Self { graph, base: "https://example.org".to_owned(), }) } #[cfg(not(target_family = "wasm"))] pub fn open(path: &Path) -> Result { let gfa_parser = GFAParser::new(); let gfa = gfa_parser .parse_file(path) .map_err(|err| StorageError::Other(Box::new(err)))?; let graph = from_gfa::(&gfa); Ok(Self { graph, base: "https://example.org".to_owned(), }) } #[cfg(not(target_family = "wasm"))] pub fn open_secondary(primary_path: &Path) -> Result { let gfa_parser = GFAParser::new(); let gfa = gfa_parser .parse_file(primary_path) .map_err(|err| StorageError::Other(Box::new(err)))?; let graph = from_gfa::(&gfa); Ok(Self { graph, base: "https://example.org".to_owned(), }) } #[cfg(not(target_family = "wasm"))] pub fn open_persistent_secondary( primary_path: &Path, _secondary_path: &Path, ) -> Result { let gfa_parser = GFAParser::new(); let gfa = gfa_parser .parse_file(primary_path) .map_err(|err| StorageError::Other(Box::new(err)))?; let graph = from_gfa::(&gfa); Ok(Self { graph, base: "https://example.org".to_owned(), }) } #[cfg(not(target_family = "wasm"))] pub fn open_read_only(path: &Path) -> Result { let gfa_parser = GFAParser::new(); let gfa = gfa_parser .parse_file(path) .map_err(|err| StorageError::Other(Box::new(err)))?; let graph = from_gfa::(&gfa); Ok(Self { graph, base: "https://example.org".to_owned(), }) } pub fn snapshot(&self) -> StorageReader { StorageReader::new(self.clone()) } // pub fn transaction<'a, 'b: 'a, T, E: Error + 'static + From>( // &'b self, // f: impl Fn(StorageWriter<'a>) -> Result, // ) -> Result { // // self.db.transaction(|transaction| { // // f(StorageWriter { // // buffer: Vec::new(), // // transaction, // // storage: self, // // }) // // }) // Err(StorageError::Io(std::io::Error::new( // std::io::ErrorKind::Unsupported, // "Transactions are currently not supported", // ))) // } #[cfg(not(target_family = "wasm"))] pub fn flush(&self) -> Result<(), StorageError> { Ok(()) } #[cfg(not(target_family = "wasm"))] pub fn compact(&self) -> Result<(), StorageError> { Ok(()) } #[cfg(not(target_family = "wasm"))] pub fn backup(&self, target_directory: &Path) -> Result<(), StorageError> { Ok(()) } } pub struct StorageReader { // reader: Reader, // storage: Storage, generator: StorageGenerator, } impl StorageReader { pub fn new(storage: Storage) -> Self { Self { generator: StorageGenerator::new(storage), } } pub fn len(&self) -> Result { // Ok(self.reader.len(&self.storage.gspo_cf)? + self.reader.len(&self.storage.dspo_cf)?) Ok(0) } pub fn is_empty(&self) -> Result { // Ok(self.reader.is_empty(&self.storage.gspo_cf)? // && self.reader.is_empty(&self.storage.dspo_cf)?) Ok(true) } pub fn contains(&self, quad: &EncodedQuad) -> Result { // let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); // if quad.graph_name.is_default_graph() { // write_spo_quad(&mut buffer, quad); // Ok(self.reader.contains_key(&self.storage.dspo_cf, &buffer)?) // } else { // write_gspo_quad(&mut buffer, quad); // Ok(self.reader.contains_key(&self.storage.gspo_cf, &buffer)?) // } Ok(true) } pub fn quads_for_pattern( &self, subject: Option<&EncodedTerm>, predicate: Option<&EncodedTerm>, object: Option<&EncodedTerm>, graph_name: Option<&EncodedTerm>, ) -> ChainedDecodingQuadIterator { let graph_name = graph_name.expect("Graph name is given"); self.generator.quads_for_pattern(subject, predicate, object, graph_name) } pub fn quads(&self) -> ChainedDecodingQuadIterator { ChainedDecodingQuadIterator::new(DecodingQuadIterator { terms: Vec::new(), encoding: QuadEncoding::Spog, }) // ChainedDecodingQuadIterator::pair(self.dspo_quads(&[]), self.gspo_quads(&[])) } pub fn named_graphs(&self) -> DecodingGraphIterator { DecodingGraphIterator { terms: Vec::new() } } pub fn contains_named_graph(&self, graph_name: &EncodedTerm) -> Result { // self.reader // .contains_key(&self.storage.graphs_cf, &encode_term(graph_name)) Ok(true) } #[cfg(not(target_family = "wasm"))] pub fn get_str(&self, key: &StrHash) -> Result, StorageError> { Ok(None) } #[cfg(not(target_family = "wasm"))] pub fn contains_str(&self, key: &StrHash) -> Result { Ok(true) } /// Validates that all the storage invariants held in the data #[cfg(not(target_family = "wasm"))] pub fn validate(&self) -> Result<(), StorageError> { Ok(()) } } pub struct ChainedDecodingQuadIterator { first: DecodingQuadIterator, second: Option, } impl ChainedDecodingQuadIterator { fn new(first: DecodingQuadIterator) -> Self { Self { first, second: None, } } fn pair(first: DecodingQuadIterator, second: DecodingQuadIterator) -> Self { Self { first, second: Some(second), } } } impl Iterator for ChainedDecodingQuadIterator { type Item = Result; fn next(&mut self) -> Option> { if let Some(result) = self.first.next() { Some(result) } else if let Some(second) = self.second.as_mut() { second.next() } else { None } } } pub struct DecodingQuadIterator { terms: Vec, encoding: QuadEncoding, } impl Iterator for DecodingQuadIterator { type Item = Result; fn next(&mut self) -> Option> { // if let Err(e) = self.iter.status() { // return Some(Err(e)); // } // let term = self.encoding.decode(self.iter.key()?); // self.iter.next(); self.terms.pop().map(|x| Ok(x)) } } pub struct DecodingGraphIterator { terms: Vec, } impl Iterator for DecodingGraphIterator { type Item = Result; fn next(&mut self) -> Option> { // if let Err(e) = self.iter.status() { // return Some(Err(e)); // } // let term = self.encoding.decode(self.iter.key()?); // self.iter.next(); self.terms.pop().map(|x| Ok(x)) } } impl StrLookup for StorageReader { fn get_str(&self, key: &StrHash) -> Result, StorageError> { self.get_str(key) } fn contains_str(&self, key: &StrHash) -> Result { self.contains_str(key) } } pub struct StorageWriter<'a> { buffer: Vec, transaction: Transaction<'a>, storage: &'a Storage, } impl<'a> StorageWriter<'a> { pub fn reader(&self) -> StorageReader { StorageReader::new(self.storage.clone()) } pub fn insert(&mut self, quad: QuadRef<'_>) -> Result { Ok(true) // let encoded = quad.into(); // self.buffer.clear(); // let result = if quad.graph_name.is_default_graph() { // write_spo_quad(&mut self.buffer, &encoded); // if self // .transaction // .contains_key_for_update(&self.storage.dspo_cf, &self.buffer)? // { // false // } else { // self.transaction // .insert_empty(&self.storage.dspo_cf, &self.buffer)?; // self.buffer.clear(); // write_pos_quad(&mut self.buffer, &encoded); // self.transaction // .insert_empty(&self.storage.dpos_cf, &self.buffer)?; // self.buffer.clear(); // write_osp_quad(&mut self.buffer, &encoded); // self.transaction // .insert_empty(&self.storage.dosp_cf, &self.buffer)?; // self.insert_term(quad.subject.into(), &encoded.subject)?; // self.insert_term(quad.predicate.into(), &encoded.predicate)?; // self.insert_term(quad.object, &encoded.object)?; // true // } // } else { // write_spog_quad(&mut self.buffer, &encoded); // if self // .transaction // .contains_key_for_update(&self.storage.spog_cf, &self.buffer)? // { // false // } else { // self.transaction // .insert_empty(&self.storage.spog_cf, &self.buffer)?; // self.buffer.clear(); // write_posg_quad(&mut self.buffer, &encoded); // self.transaction // .insert_empty(&self.storage.posg_cf, &self.buffer)?; // self.buffer.clear(); // write_ospg_quad(&mut self.buffer, &encoded); // self.transaction // .insert_empty(&self.storage.ospg_cf, &self.buffer)?; // self.buffer.clear(); // write_gspo_quad(&mut self.buffer, &encoded); // self.transaction // .insert_empty(&self.storage.gspo_cf, &self.buffer)?; // self.buffer.clear(); // write_gpos_quad(&mut self.buffer, &encoded); // self.transaction // .insert_empty(&self.storage.gpos_cf, &self.buffer)?; // self.buffer.clear(); // write_gosp_quad(&mut self.buffer, &encoded); // self.transaction // .insert_empty(&self.storage.gosp_cf, &self.buffer)?; // self.insert_term(quad.subject.into(), &encoded.subject)?; // self.insert_term(quad.predicate.into(), &encoded.predicate)?; // self.insert_term(quad.object, &encoded.object)?; // self.buffer.clear(); // write_term(&mut self.buffer, &encoded.graph_name); // if !self // .transaction // .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? // { // self.transaction // .insert_empty(&self.storage.graphs_cf, &self.buffer)?; // self.insert_graph_name(quad.graph_name, &encoded.graph_name)?; // } // true // } // }; // Ok(result) } pub fn insert_named_graph( &mut self, graph_name: NamedOrBlankNodeRef<'_>, ) -> Result { Ok(true) // let encoded_graph_name = graph_name.into(); // self.buffer.clear(); // write_term(&mut self.buffer, &encoded_graph_name); // let result = if self // .transaction // .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? // { // false // } else { // self.transaction // .insert_empty(&self.storage.graphs_cf, &self.buffer)?; // self.insert_term(graph_name.into(), &encoded_graph_name)?; // true // }; // Ok(result) } // fn insert_term( // &mut self, // term: TermRef<'_>, // encoded: &EncodedTerm, // ) -> Result<(), StorageError> { // insert_term(term, encoded, &mut |key, value| self.insert_str(key, value)) // } // fn insert_graph_name( // &mut self, // graph_name: GraphNameRef<'_>, // encoded: &EncodedTerm, // ) -> Result<(), StorageError> { // match graph_name { // GraphNameRef::NamedNode(graph_name) => self.insert_term(graph_name.into(), encoded), // GraphNameRef::BlankNode(graph_name) => self.insert_term(graph_name.into(), encoded), // GraphNameRef::DefaultGraph => Ok(()), // } // } // #[cfg(not(target_family = "wasm"))] // fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> { // if self // .storage // .db // .contains_key(&self.storage.id2str_cf, &key.to_be_bytes())? // { // return Ok(()); // } // self.storage.db.insert( // &self.storage.id2str_cf, // &key.to_be_bytes(), // value.as_bytes(), // ) // } // #[cfg(target_family = "wasm")] // fn insert_str(&mut self, key: &StrHash, value: &str) -> Result<(), StorageError> { // self.transaction.insert( // &self.storage.id2str_cf, // &key.to_be_bytes(), // value.as_bytes(), // ) // } pub fn remove(&mut self, quad: QuadRef<'_>) -> Result { // self.remove_encoded(&quad.into()) Ok(true) } // fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result { // self.buffer.clear(); // let result = if quad.graph_name.is_default_graph() { // write_spo_quad(&mut self.buffer, quad); // if self // .transaction // .contains_key_for_update(&self.storage.dspo_cf, &self.buffer)? // { // self.transaction // .remove(&self.storage.dspo_cf, &self.buffer)?; // self.buffer.clear(); // write_pos_quad(&mut self.buffer, quad); // self.transaction // .remove(&self.storage.dpos_cf, &self.buffer)?; // self.buffer.clear(); // write_osp_quad(&mut self.buffer, quad); // self.transaction // .remove(&self.storage.dosp_cf, &self.buffer)?; // true // } else { // false // } // } else { // write_spog_quad(&mut self.buffer, quad); // if self // .transaction // .contains_key_for_update(&self.storage.spog_cf, &self.buffer)? // { // self.transaction // .remove(&self.storage.spog_cf, &self.buffer)?; // self.buffer.clear(); // write_posg_quad(&mut self.buffer, quad); // self.transaction // .remove(&self.storage.posg_cf, &self.buffer)?; // self.buffer.clear(); // write_ospg_quad(&mut self.buffer, quad); // self.transaction // .remove(&self.storage.ospg_cf, &self.buffer)?; // self.buffer.clear(); // write_gspo_quad(&mut self.buffer, quad); // self.transaction // .remove(&self.storage.gspo_cf, &self.buffer)?; // self.buffer.clear(); // write_gpos_quad(&mut self.buffer, quad); // self.transaction // .remove(&self.storage.gpos_cf, &self.buffer)?; // self.buffer.clear(); // write_gosp_quad(&mut self.buffer, quad); // self.transaction // .remove(&self.storage.gosp_cf, &self.buffer)?; // true // } else { // false // } // }; // Ok(result) // } pub fn clear_graph(&mut self, graph_name: GraphNameRef<'_>) -> Result<(), StorageError> { // if graph_name.is_default_graph() { // for quad in self.reader().quads_for_graph(&EncodedTerm::DefaultGraph) { // self.remove_encoded(&quad?)?; // } // } else { // self.buffer.clear(); // write_term(&mut self.buffer, &graph_name.into()); // if self // .transaction // .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? // { // // The condition is useful to lock the graph itself and ensure no quad is inserted at the same time // for quad in self.reader().quads_for_graph(&graph_name.into()) { // self.remove_encoded(&quad?)?; // } // } // } Ok(()) } pub fn clear_all_named_graphs(&mut self) -> Result<(), StorageError> { // for quad in self.reader().quads_in_named_graph() { // self.remove_encoded(&quad?)?; // } Ok(()) } pub fn clear_all_graphs(&mut self) -> Result<(), StorageError> { // for quad in self.reader().quads() { // self.remove_encoded(&quad?)?; // } Ok(()) } pub fn remove_named_graph( &mut self, graph_name: NamedOrBlankNodeRef<'_>, ) -> Result { // self.remove_encoded_named_graph(&graph_name.into()) Ok(true) } // fn remove_encoded_named_graph( // &mut self, // graph_name: &EncodedTerm, // ) -> Result { // self.buffer.clear(); // write_term(&mut self.buffer, graph_name); // let result = if self // .transaction // .contains_key_for_update(&self.storage.graphs_cf, &self.buffer)? // { // // The condition is done ASAP to lock the graph itself // for quad in self.reader().quads_for_graph(graph_name) { // self.remove_encoded(&quad?)?; // } // self.buffer.clear(); // write_term(&mut self.buffer, graph_name); // self.transaction // .remove(&self.storage.graphs_cf, &self.buffer)?; // true // } else { // false // }; // Ok(result) // } pub fn remove_all_named_graphs(&mut self) -> Result<(), StorageError> { // for graph_name in self.reader().named_graphs() { // self.remove_encoded_named_graph(&graph_name?)?; // } Ok(()) } pub fn clear(&mut self) -> Result<(), StorageError> { // for graph_name in self.reader().named_graphs() { // self.remove_encoded_named_graph(&graph_name?)?; // } // for quad in self.reader().quads() { // self.remove_encoded(&quad?)?; // } Ok(()) } } #[cfg(not(target_family = "wasm"))] pub struct StorageBulkLoader { storage: Storage, hooks: Vec>, num_threads: Option, max_memory_size: Option, } #[cfg(not(target_family = "wasm"))] impl StorageBulkLoader { pub fn new(storage: Storage) -> Self { Self { storage, hooks: Vec::new(), num_threads: None, max_memory_size: None, } } pub fn set_num_threads(mut self, num_threads: usize) -> Self { self.num_threads = Some(num_threads); self } pub fn set_max_memory_size_in_megabytes(mut self, max_memory_size: usize) -> Self { self.max_memory_size = Some(max_memory_size); self } pub fn on_progress(mut self, callback: impl Fn(u64) + 'static) -> Self { self.hooks.push(Box::new(callback)); self } #[allow(clippy::trait_duplication_in_bounds)] pub fn load + From>( &self, quads: impl IntoIterator>, ) -> Result<(), EO> { let num_threads = self.num_threads.unwrap_or(2); if num_threads < 2 { return Err( StorageError::Other("The bulk loader needs at least 2 threads".into()).into(), ); } let batch_size = if let Some(max_memory_size) = self.max_memory_size { max_memory_size * 1000 / num_threads } else { DEFAULT_BULK_LOAD_BATCH_SIZE }; if batch_size < 10_000 { return Err(StorageError::Other( "The bulk loader memory bound is too low. It needs at least 100MB".into(), ) .into()); } let mut threads = VecDeque::with_capacity(num_threads - 1); let mut buffer = Vec::with_capacity(batch_size); let done_counter = Arc::new(AtomicU64::new(0)); let mut done_and_displayed_counter = 0; for quad in quads { let quad = quad?; buffer.push(quad); if buffer.len() >= batch_size { self.spawn_load_thread( &mut buffer, &mut threads, &done_counter, &mut done_and_displayed_counter, num_threads, batch_size, )?; } } self.spawn_load_thread( &mut buffer, &mut threads, &done_counter, &mut done_and_displayed_counter, num_threads, batch_size, )?; for thread in threads { thread.join().unwrap()?; self.on_possible_progress(&done_counter, &mut done_and_displayed_counter); } Ok(()) } fn spawn_load_thread( &self, buffer: &mut Vec, threads: &mut VecDeque>>, done_counter: &Arc, done_and_displayed_counter: &mut u64, num_threads: usize, batch_size: usize, ) -> Result<(), StorageError> { self.on_possible_progress(done_counter, done_and_displayed_counter); // We avoid to have too many threads if threads.len() >= num_threads { if let Some(thread) = threads.pop_front() { thread.join().unwrap()?; self.on_possible_progress(done_counter, done_and_displayed_counter); } } let mut buffer_to_load = Vec::with_capacity(batch_size); swap(buffer, &mut buffer_to_load); let storage = self.storage.clone(); let done_counter_clone = Arc::clone(done_counter); threads.push_back(spawn(move || { FileBulkLoader::new(storage, batch_size).load(buffer_to_load, &done_counter_clone) })); Ok(()) } fn on_possible_progress(&self, done: &AtomicU64, done_and_displayed: &mut u64) { let new_counter = done.load(Ordering::Relaxed); let display_step = u64::try_from(DEFAULT_BULK_LOAD_BATCH_SIZE).unwrap(); if new_counter / display_step > *done_and_displayed / display_step { for hook in &self.hooks { hook(new_counter); } } *done_and_displayed = new_counter; } } #[cfg(not(target_family = "wasm"))] struct FileBulkLoader { storage: Storage, id2str: HashMap>, quads: HashSet, triples: HashSet, graphs: HashSet, } #[cfg(not(target_family = "wasm"))] impl FileBulkLoader { fn new(storage: Storage, batch_size: usize) -> Self { Self { storage, id2str: HashMap::with_capacity(3 * batch_size), quads: HashSet::with_capacity(batch_size), triples: HashSet::with_capacity(batch_size), graphs: HashSet::default(), } } fn load(&mut self, quads: Vec, counter: &AtomicU64) -> Result<(), StorageError> { self.encode(quads)?; let size = self.triples.len() + self.quads.len(); // self.save()?; counter.fetch_add(size.try_into().unwrap(), Ordering::Relaxed); Ok(()) } fn encode(&mut self, quads: Vec) -> Result<(), StorageError> { for quad in quads { let encoded = EncodedQuad::from(quad.as_ref()); if quad.graph_name.is_default_graph() { if self.triples.insert(encoded.clone()) { self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?; self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?; self.insert_term(quad.object.as_ref(), &encoded.object)?; } } else if self.quads.insert(encoded.clone()) { self.insert_term(quad.subject.as_ref().into(), &encoded.subject)?; self.insert_term(quad.predicate.as_ref().into(), &encoded.predicate)?; self.insert_term(quad.object.as_ref(), &encoded.object)?; if self.graphs.insert(encoded.graph_name.clone()) { self.insert_term( match quad.graph_name.as_ref() { GraphNameRef::NamedNode(n) => n.into(), GraphNameRef::BlankNode(n) => n.into(), GraphNameRef::DefaultGraph => unreachable!(), }, &encoded.graph_name, )?; } } } Ok(()) } // fn save(&mut self) -> Result<(), StorageError> { // let mut to_load = Vec::new(); // // id2str // if !self.id2str.is_empty() { // let mut id2str = take(&mut self.id2str) // .into_iter() // .map(|(k, v)| (k.to_be_bytes(), v)) // .collect::>(); // id2str.sort_unstable(); // let mut id2str_sst = self.storage.db.new_sst_file()?; // for (k, v) in id2str { // id2str_sst.insert(&k, v.as_bytes())?; // } // to_load.push((&self.storage.id2str_cf, id2str_sst.finish()?)); // } // if !self.triples.is_empty() { // to_load.push(( // &self.storage.dspo_cf, // self.build_sst_for_keys( // self.triples.iter().map(|quad| { // encode_term_triple(&quad.subject, &quad.predicate, &quad.object) // }), // )?, // )); // to_load.push(( // &self.storage.dpos_cf, // self.build_sst_for_keys( // self.triples.iter().map(|quad| { // encode_term_triple(&quad.predicate, &quad.object, &quad.subject) // }), // )?, // )); // to_load.push(( // &self.storage.dosp_cf, // self.build_sst_for_keys( // self.triples.iter().map(|quad| { // encode_term_triple(&quad.object, &quad.subject, &quad.predicate) // }), // )?, // )); // self.triples.clear(); // } // if !self.quads.is_empty() { // to_load.push(( // &self.storage.graphs_cf, // self.build_sst_for_keys(self.graphs.iter().map(encode_term))?, // )); // self.graphs.clear(); // to_load.push(( // &self.storage.gspo_cf, // self.build_sst_for_keys(self.quads.iter().map(|quad| { // encode_term_quad( // &quad.graph_name, // &quad.subject, // &quad.predicate, // &quad.object, // ) // }))?, // )); // to_load.push(( // &self.storage.gpos_cf, // self.build_sst_for_keys(self.quads.iter().map(|quad| { // encode_term_quad( // &quad.graph_name, // &quad.predicate, // &quad.object, // &quad.subject, // ) // }))?, // )); // to_load.push(( // &self.storage.gosp_cf, // self.build_sst_for_keys(self.quads.iter().map(|quad| { // encode_term_quad( // &quad.graph_name, // &quad.object, // &quad.subject, // &quad.predicate, // ) // }))?, // )); // to_load.push(( // &self.storage.spog_cf, // self.build_sst_for_keys(self.quads.iter().map(|quad| { // encode_term_quad( // &quad.subject, // &quad.predicate, // &quad.object, // &quad.graph_name, // ) // }))?, // )); // to_load.push(( // &self.storage.posg_cf, // self.build_sst_for_keys(self.quads.iter().map(|quad| { // encode_term_quad( // &quad.predicate, // &quad.object, // &quad.subject, // &quad.graph_name, // ) // }))?, // )); // to_load.push(( // &self.storage.ospg_cf, // self.build_sst_for_keys(self.quads.iter().map(|quad| { // encode_term_quad( // &quad.object, // &quad.subject, // &quad.predicate, // &quad.graph_name, // ) // }))?, // )); // self.quads.clear(); // } // self.storage.db.insert_stt_files(&to_load) // } fn insert_term( &mut self, term: TermRef<'_>, encoded: &EncodedTerm, ) -> Result<(), StorageError> { insert_term(term, encoded, &mut |key, value| { self.id2str.entry(*key).or_insert_with(|| value.into()); Ok(()) }) } // fn build_sst_for_keys( // &self, // values: impl Iterator>, // ) -> Result { // let mut values = values.collect::>(); // values.sort_unstable(); // let mut sst = self.storage.db.new_sst_file()?; // for value in values { // sst.insert_empty(&value)?; // } // sst.finish() // } }