Breaking: Adds transaction support to Sled

Breaks Sled storage format
pull/46/head
Tpt 4 years ago
parent c309a5ccdb
commit fd1bb216be
  1. 13
      lib/src/store/memory.rs
  2. 68
      lib/src/store/numeric_encoder.rs
  3. 70
      lib/src/store/rocksdb.rs
  4. 491
      lib/src/store/sled.rs

@ -1140,15 +1140,10 @@ fn label(g: &MemoryStore, hashes: &TrivialHashMap<EncodedTerm, u64>) -> Vec<Vec<
.into_iter() .into_iter()
.map(|q| { .map(|q| {
let mut buffer = Vec::with_capacity(WRITTEN_TERM_MAX_SIZE * 4); let mut buffer = Vec::with_capacity(WRITTEN_TERM_MAX_SIZE * 4);
write_spog_quad( write_term(&mut buffer, map_term(q.subject, hashes));
&mut buffer, write_term(&mut buffer, map_term(q.predicate, hashes));
&EncodedQuad::new( write_term(&mut buffer, map_term(q.object, hashes));
map_term(q.subject, hashes), write_term(&mut buffer, map_term(q.graph_name, hashes));
map_term(q.predicate, hashes),
map_term(q.object, hashes),
map_term(q.graph_name, hashes),
),
);
buffer buffer
}) })
.collect(); .collect();

@ -14,7 +14,7 @@ use std::convert::Infallible;
use std::error::Error; use std::error::Error;
use std::hash::Hash; use std::hash::Hash;
use std::hash::Hasher; use std::hash::Hasher;
use std::io::{Cursor, Error as IoError, ErrorKind, Read, Result as IoResult}; use std::io::{Error as IoError, ErrorKind, Read, Result as IoResult};
use std::mem::size_of; use std::mem::size_of;
use std::str; use std::str;
@ -858,72 +858,6 @@ pub fn write_term(sink: &mut Vec<u8>, term: EncodedTerm) {
} }
} }
pub fn write_spog_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
write_term(sink, quad.object);
write_term(sink, quad.graph_name);
}
pub fn write_posg_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.predicate);
write_term(sink, quad.object);
write_term(sink, quad.subject);
write_term(sink, quad.graph_name);
}
pub fn write_ospg_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.object);
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
write_term(sink, quad.graph_name);
}
pub fn write_gspo_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.graph_name);
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
write_term(sink, quad.object);
}
pub fn write_gpos_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.graph_name);
write_term(sink, quad.predicate);
write_term(sink, quad.object);
write_term(sink, quad.subject);
}
pub fn write_gosp_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.graph_name);
write_term(sink, quad.object);
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
}
#[derive(Clone, Copy)]
pub enum QuadEncoding {
SPOG,
POSG,
OSPG,
GSPO,
GPOS,
GOSP,
}
impl QuadEncoding {
pub fn decode(self, buffer: &[u8]) -> IoResult<EncodedQuad> {
let mut cursor = Cursor::new(&buffer);
match self {
QuadEncoding::SPOG => cursor.read_spog_quad(),
QuadEncoding::POSG => cursor.read_posg_quad(),
QuadEncoding::OSPG => cursor.read_ospg_quad(),
QuadEncoding::GSPO => cursor.read_gspo_quad(),
QuadEncoding::GPOS => cursor.read_gpos_quad(),
QuadEncoding::GOSP => cursor.read_gosp_quad(),
}
}
}
pub trait StrLookup { pub trait StrLookup {
type Error: Error + Into<OxError>; type Error: Error + Into<OxError>;

@ -6,7 +6,7 @@ use crate::store::numeric_encoder::*;
use crate::store::{load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore}; use crate::store::{load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore};
use crate::{DatasetSyntax, GraphSyntax, Result}; use crate::{DatasetSyntax, GraphSyntax, Result};
use rocksdb::*; use rocksdb::*;
use std::io::BufRead; use std::io::{BufRead, Cursor};
use std::mem::take; use std::mem::take;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
@ -530,7 +530,7 @@ impl RocksDbPreparedQuery {
} }
} }
/// Allows to insert and delete quads during a transaction with the `RocksDbStore`. /// Allows inserting and deleting quads during a transaction with the `RocksDbStore`.
pub struct RocksDbTransaction<'a> { pub struct RocksDbTransaction<'a> {
inner: RocksDbInnerTransaction<'a>, inner: RocksDbInnerTransaction<'a>,
} }
@ -781,6 +781,72 @@ impl<'a> Iterator for DecodingIndexIterator<'a> {
} }
} }
fn write_spog_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
write_term(sink, quad.object);
write_term(sink, quad.graph_name);
}
fn write_posg_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.predicate);
write_term(sink, quad.object);
write_term(sink, quad.subject);
write_term(sink, quad.graph_name);
}
fn write_ospg_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.object);
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
write_term(sink, quad.graph_name);
}
fn write_gspo_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.graph_name);
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
write_term(sink, quad.object);
}
fn write_gpos_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.graph_name);
write_term(sink, quad.predicate);
write_term(sink, quad.object);
write_term(sink, quad.subject);
}
fn write_gosp_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.graph_name);
write_term(sink, quad.object);
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
}
#[derive(Clone, Copy)]
enum QuadEncoding {
SPOG,
POSG,
OSPG,
GSPO,
GPOS,
GOSP,
}
impl QuadEncoding {
fn decode(self, buffer: &[u8]) -> Result<EncodedQuad> {
let mut cursor = Cursor::new(&buffer);
Ok(match self {
QuadEncoding::SPOG => cursor.read_spog_quad(),
QuadEncoding::POSG => cursor.read_posg_quad(),
QuadEncoding::OSPG => cursor.read_ospg_quad(),
QuadEncoding::GSPO => cursor.read_gspo_quad(),
QuadEncoding::GPOS => cursor.read_gpos_quad(),
QuadEncoding::GOSP => cursor.read_gosp_quad(),
}?)
}
}
#[test] #[test]
fn store() -> Result<()> { fn store() -> Result<()> {
use crate::model::*; use crate::model::*;

@ -1,12 +1,14 @@
//! Store based on the [Sled](https://sled.rs/) key-value database. //! Store based on the [Sled](https://sled.rs/) key-value database.
use crate::error::UnwrapInfallible;
use crate::model::*; use crate::model::*;
use crate::sparql::{GraphPattern, QueryOptions, QueryResult, SimplePreparedQuery}; use crate::sparql::{GraphPattern, QueryOptions, QueryResult, SimplePreparedQuery};
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::store::{load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore}; use crate::store::{load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore};
use crate::{DatasetSyntax, Error, GraphSyntax, Result}; use crate::{DatasetSyntax, Error, GraphSyntax, Result};
use sled::{Config, Iter, Tree}; use sled::{Batch, Config, Iter, Tree};
use std::io::BufRead; use std::convert::Infallible;
use std::io::{BufRead, Cursor};
use std::path::Path; use std::path::Path;
use std::{fmt, str}; use std::{fmt, str};
@ -49,14 +51,16 @@ use std::{fmt, str};
#[derive(Clone)] #[derive(Clone)]
pub struct SledStore { pub struct SledStore {
id2str: Tree, id2str: Tree,
spog: Tree, quads: Tree,
posg: Tree,
ospg: Tree,
gspo: Tree,
gpos: Tree,
gosp: Tree,
} }
const SPOG_PREFIX: u8 = 1;
const POSG_PREFIX: u8 = 2;
const OSPG_PREFIX: u8 = 3;
const GSPO_PREFIX: u8 = 4;
const GPOS_PREFIX: u8 = 5;
const GOSP_PREFIX: u8 = 6;
//TODO: indexes for the default graph and indexes for the named graphs (no more Optional and space saving) //TODO: indexes for the default graph and indexes for the named graphs (no more Optional and space saving)
impl SledStore { impl SledStore {
@ -74,14 +78,9 @@ impl SledStore {
let db = config.open()?; let db = config.open()?;
let new = Self { let new = Self {
id2str: db.open_tree("id2str")?, id2str: db.open_tree("id2str")?,
spog: db.open_tree("spog")?, quads: db.open_tree("quads")?,
posg: db.open_tree("posg")?,
ospg: db.open_tree("ospg")?,
gspo: db.open_tree("gspo")?,
gpos: db.open_tree("gpos")?,
gosp: db.open_tree("gosp")?,
}; };
(&new).set_first_strings()?; DirectWriter::new(&new).set_first_strings()?;
Ok(new) Ok(new)
} }
@ -140,12 +139,29 @@ impl SledStore {
/// Returns the number of quads in the store /// Returns the number of quads in the store
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.spog.len() self.quads.len() / 6
} }
/// Returns if the store is empty /// Returns if the store is empty
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.spog.is_empty() self.quads.is_empty()
}
/// Executes a transaction.
///
/// The transaction is executed if the given closure returns `Ok`.
/// Nothing is done if the closure returns `Err`.
///
/// See `MemoryStore` for a usage example.
pub fn transaction<'a>(
&'a self,
f: impl FnOnce(&mut SledTransaction<'a>) -> Result<()>,
) -> Result<()> {
let mut transaction = SledTransaction {
inner: BatchWriter::new(self),
};
f(&mut transaction)?;
transaction.inner.apply()
} }
/// Loads a graph file (i.e. triples) into the store /// Loads a graph file (i.e. triples) into the store
@ -161,8 +177,13 @@ impl SledStore {
to_graph_name: &GraphName, to_graph_name: &GraphName,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<()> { ) -> Result<()> {
let mut store = self; load_graph(
load_graph(&mut store, reader, syntax, to_graph_name, base_iri) &mut DirectWriter::new(self),
reader,
syntax,
to_graph_name,
base_iri,
)
} }
/// Loads a dataset file (i.e. quads) into the store. /// Loads a dataset file (i.e. quads) into the store.
@ -177,28 +198,26 @@ impl SledStore {
syntax: DatasetSyntax, syntax: DatasetSyntax,
base_iri: Option<&str>, base_iri: Option<&str>,
) -> Result<()> { ) -> Result<()> {
let mut store = self; load_dataset(&mut DirectWriter::new(self), reader, syntax, base_iri)
load_dataset(&mut store, reader, syntax, base_iri)
} }
/// Adds a quad to this store. /// Adds a quad to this store.
pub fn insert(&self, quad: &Quad) -> Result<()> { pub fn insert(&self, quad: &Quad) -> Result<()> {
let mut store = self; let mut writer = DirectWriter::new(self);
let quad = store.encode_quad(quad)?; let quad = writer.encode_quad(quad)?;
store.insert_encoded(&quad) writer.insert_encoded(&quad)
} }
/// Removes a quad from this store. /// Removes a quad from this store.
pub fn remove(&self, quad: &Quad) -> Result<()> { pub fn remove(&self, quad: &Quad) -> Result<()> {
let mut store = self;
let quad = quad.into(); let quad = quad.into();
store.remove_encoded(&quad) DirectWriter::new(self).remove_encoded(&quad)
} }
fn contains_encoded(&self, quad: &EncodedQuad) -> Result<bool> { fn contains_encoded(&self, quad: &EncodedQuad) -> Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
write_spog_quad(&mut buffer, quad); write_spog_quad(&mut buffer, quad);
Ok(self.spog.contains_key(buffer)?) Ok(self.quads.contains_key(buffer)?)
} }
fn encoded_quads_for_pattern_inner( fn encoded_quads_for_pattern_inner(
@ -212,8 +231,13 @@ impl SledStore {
Some(subject) => match predicate { Some(subject) => match predicate {
Some(predicate) => match object { Some(predicate) => match object {
Some(object) => match graph_name { Some(object) => match graph_name {
Some(graph_name) => self Some(graph_name) => self.inner_quads(encode_term_quad(
.spog_quads(encode_term_quad(subject, predicate, object, graph_name)), SPOG_PREFIX,
subject,
predicate,
object,
graph_name,
)),
None => self.quads_for_subject_predicate_object(subject, predicate, object), None => self.quads_for_subject_predicate_object(subject, predicate, object),
}, },
None => match graph_name { None => match graph_name {
@ -264,11 +288,11 @@ impl SledStore {
} }
fn quads(&self) -> DecodingQuadIterator { fn quads(&self) -> DecodingQuadIterator {
self.spog_quads(Vec::default()) self.inner_quads(&[SPOG_PREFIX])
} }
fn quads_for_subject(&self, subject: EncodedTerm) -> DecodingQuadIterator { fn quads_for_subject(&self, subject: EncodedTerm) -> DecodingQuadIterator {
self.spog_quads(encode_term(subject)) self.inner_quads(encode_term(SPOG_PREFIX, subject))
} }
fn quads_for_subject_predicate( fn quads_for_subject_predicate(
@ -276,7 +300,7 @@ impl SledStore {
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> DecodingQuadIterator { ) -> DecodingQuadIterator {
self.spog_quads(encode_term_pair(subject, predicate)) self.inner_quads(encode_term_pair(SPOG_PREFIX, subject, predicate))
} }
fn quads_for_subject_predicate_object( fn quads_for_subject_predicate_object(
@ -285,7 +309,7 @@ impl SledStore {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> DecodingQuadIterator { ) -> DecodingQuadIterator {
self.spog_quads(encode_term_triple(subject, predicate, object)) self.inner_quads(encode_term_triple(SPOG_PREFIX, subject, predicate, object))
} }
fn quads_for_subject_object( fn quads_for_subject_object(
@ -293,11 +317,11 @@ impl SledStore {
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> DecodingQuadIterator { ) -> DecodingQuadIterator {
self.ospg_quads(encode_term_pair(object, subject)) self.inner_quads(encode_term_pair(OSPG_PREFIX, object, subject))
} }
fn quads_for_predicate(&self, predicate: EncodedTerm) -> DecodingQuadIterator { fn quads_for_predicate(&self, predicate: EncodedTerm) -> DecodingQuadIterator {
self.posg_quads(encode_term(predicate)) self.inner_quads(encode_term(POSG_PREFIX, predicate))
} }
fn quads_for_predicate_object( fn quads_for_predicate_object(
@ -305,15 +329,15 @@ impl SledStore {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> DecodingQuadIterator { ) -> DecodingQuadIterator {
self.posg_quads(encode_term_pair(predicate, object)) self.inner_quads(encode_term_pair(POSG_PREFIX, predicate, object))
} }
fn quads_for_object(&self, object: EncodedTerm) -> DecodingQuadIterator { fn quads_for_object(&self, object: EncodedTerm) -> DecodingQuadIterator {
self.ospg_quads(encode_term(object)) self.inner_quads(encode_term(OSPG_PREFIX, object))
} }
fn quads_for_graph(&self, graph_name: EncodedTerm) -> DecodingQuadIterator { fn quads_for_graph(&self, graph_name: EncodedTerm) -> DecodingQuadIterator {
self.gspo_quads(encode_term(graph_name)) self.inner_quads(encode_term(GSPO_PREFIX, graph_name))
} }
fn quads_for_subject_graph( fn quads_for_subject_graph(
@ -321,7 +345,7 @@ impl SledStore {
subject: EncodedTerm, subject: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingQuadIterator { ) -> DecodingQuadIterator {
self.gspo_quads(encode_term_pair(graph_name, subject)) self.inner_quads(encode_term_pair(GSPO_PREFIX, graph_name, subject))
} }
fn quads_for_subject_predicate_graph( fn quads_for_subject_predicate_graph(
@ -330,7 +354,12 @@ impl SledStore {
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingQuadIterator { ) -> DecodingQuadIterator {
self.gspo_quads(encode_term_triple(graph_name, subject, predicate)) self.inner_quads(encode_term_triple(
GSPO_PREFIX,
graph_name,
subject,
predicate,
))
} }
fn quads_for_subject_object_graph( fn quads_for_subject_object_graph(
@ -339,7 +368,7 @@ impl SledStore {
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingQuadIterator { ) -> DecodingQuadIterator {
self.gosp_quads(encode_term_triple(graph_name, object, subject)) self.inner_quads(encode_term_triple(GOSP_PREFIX, graph_name, object, subject))
} }
fn quads_for_predicate_graph( fn quads_for_predicate_graph(
@ -347,7 +376,7 @@ impl SledStore {
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingQuadIterator { ) -> DecodingQuadIterator {
self.gpos_quads(encode_term_pair(graph_name, predicate)) self.inner_quads(encode_term_pair(GPOS_PREFIX, graph_name, predicate))
} }
fn quads_for_predicate_object_graph( fn quads_for_predicate_object_graph(
@ -356,7 +385,12 @@ impl SledStore {
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingQuadIterator { ) -> DecodingQuadIterator {
self.gpos_quads(encode_term_triple(graph_name, predicate, object)) self.inner_quads(encode_term_triple(
GPOS_PREFIX,
graph_name,
predicate,
object,
))
} }
fn quads_for_object_graph( fn quads_for_object_graph(
@ -364,42 +398,12 @@ impl SledStore {
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingQuadIterator { ) -> DecodingQuadIterator {
self.gosp_quads(encode_term_pair(graph_name, object)) self.inner_quads(encode_term_pair(GPOS_PREFIX, graph_name, object))
}
fn spog_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
self.inner_quads(&self.spog, prefix, QuadEncoding::SPOG)
}
fn posg_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
self.inner_quads(&self.posg, prefix, QuadEncoding::POSG)
} }
fn ospg_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator { fn inner_quads(&self, prefix: impl AsRef<[u8]>) -> DecodingQuadIterator {
self.inner_quads(&self.ospg, prefix, QuadEncoding::OSPG)
}
fn gspo_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
self.inner_quads(&self.gspo, prefix, QuadEncoding::GSPO)
}
fn gpos_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
self.inner_quads(&self.gpos, prefix, QuadEncoding::GPOS)
}
fn gosp_quads(&self, prefix: Vec<u8>) -> DecodingQuadIterator {
self.inner_quads(&self.gosp, prefix, QuadEncoding::GOSP)
}
fn inner_quads(
&self,
tree: &Tree,
prefix: Vec<u8>,
order: QuadEncoding,
) -> DecodingQuadIterator {
DecodingQuadIterator { DecodingQuadIterator {
iter: tree.scan_prefix(prefix), iter: self.quads.scan_prefix(prefix),
order,
} }
} }
} }
@ -437,81 +441,238 @@ impl ReadableEncodedStore for SledStore {
} }
} }
impl<'a> StrContainer for &'a SledStore { struct DirectWriter<'a> {
store: &'a SledStore,
buffer: Vec<u8>,
}
impl<'a> DirectWriter<'a> {
fn new(store: &'a SledStore) -> Self {
Self {
store,
buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1),
}
}
}
impl<'a> StrContainer for DirectWriter<'a> {
type Error = Error; type Error = Error;
fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> { fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> {
self.id2str.insert(key.to_be_bytes(), value)?; self.store
.id2str
.insert(key.to_be_bytes().as_ref(), value)?;
Ok(()) Ok(())
} }
} }
impl<'a> WritableEncodedStore for &'a SledStore { impl<'a> WritableEncodedStore for DirectWriter<'a> {
type Error = Error; type Error = Error;
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
//TODO: atomicity write_spog_quad(&mut self.buffer, quad);
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); self.store.quads.insert(self.buffer.as_slice(), &[])?;
self.buffer.clear();
write_spog_quad(&mut buffer, quad); write_posg_quad(&mut self.buffer, quad);
self.spog.insert(&buffer, &[])?; self.store.quads.insert(self.buffer.as_slice(), &[])?;
buffer.clear(); self.buffer.clear();
write_posg_quad(&mut buffer, quad);
self.posg.insert(&buffer, &[])?;
buffer.clear();
write_ospg_quad(&mut buffer, quad); write_ospg_quad(&mut self.buffer, quad);
self.ospg.insert(&buffer, &[])?; self.store.quads.insert(self.buffer.as_slice(), &[])?;
buffer.clear(); self.buffer.clear();
write_gspo_quad(&mut buffer, quad); write_gspo_quad(&mut self.buffer, quad);
self.gspo.insert(&buffer, &[])?; self.store.quads.insert(self.buffer.as_slice(), &[])?;
buffer.clear(); self.buffer.clear();
write_gpos_quad(&mut buffer, quad); write_gpos_quad(&mut self.buffer, quad);
self.gpos.insert(&buffer, &[])?; self.store.quads.insert(self.buffer.as_slice(), &[])?;
buffer.clear(); self.buffer.clear();
write_gosp_quad(&mut buffer, quad); write_gosp_quad(&mut self.buffer, quad);
self.gosp.insert(&buffer, &[])?; self.store.quads.insert(self.buffer.as_slice(), &[])?;
buffer.clear(); self.buffer.clear();
Ok(()) Ok(())
} }
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> { fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> {
//TODO: atomicity write_spog_quad(&mut self.buffer, quad);
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); self.store.quads.remove(self.buffer.as_slice())?;
self.buffer.clear();
write_spog_quad(&mut buffer, quad); write_posg_quad(&mut self.buffer, quad);
self.spog.remove(&buffer)?; self.store.quads.remove(self.buffer.as_slice())?;
buffer.clear(); self.buffer.clear();
write_ospg_quad(&mut self.buffer, quad);
self.store.quads.remove(self.buffer.as_slice())?;
self.buffer.clear();
write_gspo_quad(&mut self.buffer, quad);
self.store.quads.remove(self.buffer.as_slice())?;
self.buffer.clear();
write_gpos_quad(&mut self.buffer, quad);
self.store.quads.remove(self.buffer.as_slice())?;
self.buffer.clear();
write_gosp_quad(&mut self.buffer, quad);
self.store.quads.remove(self.buffer.as_slice())?;
self.buffer.clear();
Ok(())
}
}
struct BatchWriter<'a> {
store: &'a SledStore,
quads: Batch,
id2str: Batch,
buffer: Vec<u8>,
}
impl<'a> BatchWriter<'a> {
fn new(store: &'a SledStore) -> Self {
Self {
store,
quads: Batch::default(),
id2str: Batch::default(),
buffer: Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1),
}
}
}
impl<'a> BatchWriter<'a> {
fn apply(self) -> Result<()> {
self.store.id2str.apply_batch(self.id2str)?;
self.store.quads.apply_batch(self.quads)?;
Ok(())
}
}
impl<'a> StrContainer for BatchWriter<'a> {
type Error = Infallible;
fn insert_str(&mut self, key: StrHash, value: &str) -> std::result::Result<(), Infallible> {
self.id2str.insert(key.to_be_bytes().as_ref(), value);
Ok(())
}
}
impl<'a> WritableEncodedStore for BatchWriter<'a> {
type Error = Infallible;
fn insert_encoded(&mut self, quad: &EncodedQuad) -> std::result::Result<(), Infallible> {
write_spog_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
write_posg_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
write_ospg_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
write_gspo_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
write_gpos_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
write_gosp_quad(&mut self.buffer, quad);
self.quads.insert(self.buffer.as_slice(), &[]);
self.buffer.clear();
Ok(())
}
fn remove_encoded(&mut self, quad: &EncodedQuad) -> std::result::Result<(), Infallible> {
write_spog_quad(&mut self.buffer, quad);
self.quads.remove(self.buffer.as_slice());
self.buffer.clear();
write_posg_quad(&mut buffer, quad); write_posg_quad(&mut self.buffer, quad);
self.posg.remove(&buffer)?; self.quads.remove(self.buffer.as_slice());
buffer.clear(); self.buffer.clear();
write_ospg_quad(&mut buffer, quad); write_ospg_quad(&mut self.buffer, quad);
self.ospg.remove(&buffer)?; self.quads.remove(self.buffer.as_slice());
buffer.clear(); self.buffer.clear();
write_gspo_quad(&mut buffer, quad); write_gspo_quad(&mut self.buffer, quad);
self.gspo.remove(&buffer)?; self.quads.remove(self.buffer.as_slice());
buffer.clear(); self.buffer.clear();
write_gpos_quad(&mut buffer, quad); write_gpos_quad(&mut self.buffer, quad);
self.gpos.remove(&buffer)?; self.quads.remove(self.buffer.as_slice());
buffer.clear(); self.buffer.clear();
write_gosp_quad(&mut buffer, quad); write_gosp_quad(&mut self.buffer, quad);
self.gosp.remove(&buffer)?; self.quads.remove(self.buffer.as_slice());
buffer.clear(); self.buffer.clear();
Ok(()) Ok(())
} }
} }
/// Allows inserting and deleting quads during a transaction with the `SeldStore`.
pub struct SledTransaction<'a> {
inner: BatchWriter<'a>,
}
impl SledTransaction<'_> {
/// Loads a graph file (i.e. triples) into the store during the transaction.
///
/// Warning: Because the load happens during a transaction,
/// the full file content might be temporarily stored in main memory.
/// Do not use for big files.
///
/// See `MemoryTransaction` for a usage example.
pub fn load_graph(
&mut self,
reader: impl BufRead,
syntax: GraphSyntax,
to_graph_name: &GraphName,
base_iri: Option<&str>,
) -> Result<()> {
load_graph(&mut self.inner, reader, syntax, to_graph_name, base_iri)
}
/// Loads a dataset file (i.e. quads) into the store. into the store during the transaction.
///
/// Warning: Because the load happens during a transaction,
/// the full file content might be temporarily stored in main memory.
/// Do not use for big files.
///
/// See `MemoryTransaction` for a usage example.
pub fn load_dataset(
&mut self,
reader: impl BufRead,
syntax: DatasetSyntax,
base_iri: Option<&str>,
) -> Result<()> {
load_dataset(&mut self.inner, reader, syntax, base_iri)
}
/// Adds a quad to this store during the transaction.
pub fn insert(&mut self, quad: &Quad) {
let quad = self.inner.encode_quad(quad).unwrap_infallible();
self.inner.insert_encoded(&quad).unwrap_infallible()
}
/// Removes a quad from this store during the transaction.
pub fn remove(&mut self, quad: &Quad) {
let quad = quad.into();
self.inner.remove_encoded(&quad).unwrap_infallible()
}
}
/// A prepared [SPARQL query](https://www.w3.org/TR/sparql11-query/) for the `SledStore`. /// A prepared [SPARQL query](https://www.w3.org/TR/sparql11-query/) for the `SledStore`.
pub struct SledPreparedQuery(SimplePreparedQuery<SledStore>); pub struct SledPreparedQuery(SimplePreparedQuery<SledStore>);
@ -522,29 +683,39 @@ impl SledPreparedQuery {
} }
} }
fn encode_term(t: EncodedTerm) -> Vec<u8> { fn encode_term(prefix: u8, t: EncodedTerm) -> Vec<u8> {
let mut vec = Vec::with_capacity(WRITTEN_TERM_MAX_SIZE); let mut vec = Vec::with_capacity(WRITTEN_TERM_MAX_SIZE + 1);
vec.push(prefix);
write_term(&mut vec, t); write_term(&mut vec, t);
vec vec
} }
fn encode_term_pair(t1: EncodedTerm, t2: EncodedTerm) -> Vec<u8> { fn encode_term_pair(prefix: u8, t1: EncodedTerm, t2: EncodedTerm) -> Vec<u8> {
let mut vec = Vec::with_capacity(2 * WRITTEN_TERM_MAX_SIZE); let mut vec = Vec::with_capacity(2 * WRITTEN_TERM_MAX_SIZE + 1);
vec.push(prefix);
write_term(&mut vec, t1); write_term(&mut vec, t1);
write_term(&mut vec, t2); write_term(&mut vec, t2);
vec vec
} }
fn encode_term_triple(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm) -> Vec<u8> { fn encode_term_triple(prefix: u8, t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm) -> Vec<u8> {
let mut vec = Vec::with_capacity(3 * WRITTEN_TERM_MAX_SIZE); let mut vec = Vec::with_capacity(3 * WRITTEN_TERM_MAX_SIZE + 1);
vec.push(prefix);
write_term(&mut vec, t1); write_term(&mut vec, t1);
write_term(&mut vec, t2); write_term(&mut vec, t2);
write_term(&mut vec, t3); write_term(&mut vec, t3);
vec vec
} }
fn encode_term_quad(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm, t4: EncodedTerm) -> Vec<u8> { fn encode_term_quad(
let mut vec = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); prefix: u8,
t1: EncodedTerm,
t2: EncodedTerm,
t3: EncodedTerm,
t4: EncodedTerm,
) -> Vec<u8> {
let mut vec = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE + 1);
vec.push(prefix);
write_term(&mut vec, t1); write_term(&mut vec, t1);
write_term(&mut vec, t2); write_term(&mut vec, t2);
write_term(&mut vec, t3); write_term(&mut vec, t3);
@ -554,7 +725,6 @@ fn encode_term_quad(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm, t4: Encod
struct DecodingQuadIterator { struct DecodingQuadIterator {
iter: Iter, iter: Iter,
order: QuadEncoding,
} }
impl Iterator for DecodingQuadIterator { impl Iterator for DecodingQuadIterator {
@ -562,12 +732,73 @@ impl Iterator for DecodingQuadIterator {
fn next(&mut self) -> Option<Result<EncodedQuad>> { fn next(&mut self) -> Option<Result<EncodedQuad>> {
Some(match self.iter.next()? { Some(match self.iter.next()? {
Ok((encoded, _)) => self.order.decode(&encoded).map_err(Error::from), Ok((encoded, _)) => decode_quad(&encoded),
Err(error) => Err(error.into()), Err(error) => Err(error.into()),
}) })
} }
} }
fn write_spog_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
sink.push(SPOG_PREFIX);
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
write_term(sink, quad.object);
write_term(sink, quad.graph_name);
}
fn write_posg_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
sink.push(POSG_PREFIX);
write_term(sink, quad.predicate);
write_term(sink, quad.object);
write_term(sink, quad.subject);
write_term(sink, quad.graph_name);
}
fn write_ospg_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
sink.push(OSPG_PREFIX);
write_term(sink, quad.object);
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
write_term(sink, quad.graph_name);
}
fn write_gspo_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
sink.push(GSPO_PREFIX);
write_term(sink, quad.graph_name);
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
write_term(sink, quad.object);
}
fn write_gpos_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
sink.push(GPOS_PREFIX);
write_term(sink, quad.graph_name);
write_term(sink, quad.predicate);
write_term(sink, quad.object);
write_term(sink, quad.subject);
}
fn write_gosp_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
sink.push(GOSP_PREFIX);
write_term(sink, quad.graph_name);
write_term(sink, quad.object);
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
}
fn decode_quad(encoded: &[u8]) -> Result<EncodedQuad> {
let mut cursor = Cursor::new(&encoded[1..]);
match encoded[0] {
SPOG_PREFIX => Ok(cursor.read_spog_quad()?),
POSG_PREFIX => Ok(cursor.read_posg_quad()?),
OSPG_PREFIX => Ok(cursor.read_ospg_quad()?),
GSPO_PREFIX => Ok(cursor.read_gspo_quad()?),
GPOS_PREFIX => Ok(cursor.read_gpos_quad()?),
GOSP_PREFIX => Ok(cursor.read_gosp_quad()?),
_ => Err(Error::msg("Invalid quad type identifier")),
}
}
#[test] #[test]
fn store() -> Result<()> { fn store() -> Result<()> {
use crate::model::*; use crate::model::*;

Loading…
Cancel
Save