Breaking: RocksDBStore: use 3 indexes instead of 6 for the default graph

pull/46/head
Tpt 4 years ago
parent a96ef43e4e
commit 09d0089910
  1. 198
      lib/src/store/numeric_encoder.rs
  2. 421
      lib/src/store/rocksdb.rs

@ -480,12 +480,120 @@ impl<I: StrId> EncodedQuad<I> {
pub trait TermReader { pub trait TermReader {
fn read_term(&mut self) -> Result<EncodedTerm<StrHash>, io::Error>; fn read_term(&mut self) -> Result<EncodedTerm<StrHash>, io::Error>;
fn read_spog_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error>;
fn read_posg_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error>; fn read_spog_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
fn read_ospg_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error>; let subject = self.read_term()?;
fn read_gspo_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error>; let predicate = self.read_term()?;
fn read_gpos_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error>; let object = self.read_term()?;
fn read_gosp_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error>; let graph_name = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_posg_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let predicate = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
let graph_name = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_ospg_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let object = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
let graph_name = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_gspo_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let graph_name = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_gpos_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let graph_name = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_gosp_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let graph_name = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_dspo_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let subject = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name: EncodedTerm::DefaultGraph,
})
}
fn read_dpos_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let predicate = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name: EncodedTerm::DefaultGraph,
})
}
fn read_dosp_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let object = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name: EncodedTerm::DefaultGraph,
})
}
} }
impl<R: Read> TermReader for R { impl<R: Read> TermReader for R {
@ -605,84 +713,6 @@ impl<R: Read> TermReader for R {
_ => Err(invalid_data_error("the term buffer has an invalid type id")), _ => Err(invalid_data_error("the term buffer has an invalid type id")),
} }
} }
fn read_spog_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let subject = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
let graph_name = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_posg_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let predicate = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
let graph_name = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_ospg_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let object = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
let graph_name = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_gspo_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let graph_name = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_gpos_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let graph_name = self.read_term()?;
let predicate = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
fn read_gosp_quad(&mut self) -> Result<EncodedQuad<StrHash>, io::Error> {
let graph_name = self.read_term()?;
let object = self.read_term()?;
let subject = self.read_term()?;
let predicate = self.read_term()?;
Ok(EncodedQuad {
subject,
predicate,
object,
graph_name,
})
}
} }
pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<StrHash>(); pub const WRITTEN_TERM_MAX_SIZE: usize = size_of::<u8>() + 2 * size_of::<StrHash>();

@ -71,11 +71,12 @@ const OSPG_CF: &str = "ospg";
const GSPO_CF: &str = "gspo"; const GSPO_CF: &str = "gspo";
const GPOS_CF: &str = "gpos"; const GPOS_CF: &str = "gpos";
const GOSP_CF: &str = "gosp"; const GOSP_CF: &str = "gosp";
const DSPO_CF: &str = "dspo";
const DPOS_CF: &str = "dpos";
const DOSP_CF: &str = "dosp";
//TODO: indexes for the default graph and indexes for the named graphs (no more Optional and space saving) const COLUMN_FAMILIES: [&str; 10] = [
ID2STR_CF, SPOG_CF, POSG_CF, OSPG_CF, GSPO_CF, GPOS_CF, GOSP_CF, DSPO_CF, DPOS_CF, DOSP_CF,
const COLUMN_FAMILIES: [&str; 7] = [
ID2STR_CF, SPOG_CF, POSG_CF, OSPG_CF, GSPO_CF, GPOS_CF, GOSP_CF,
]; ];
const MAX_TRANSACTION_SIZE: usize = 1024; const MAX_TRANSACTION_SIZE: usize = 1024;
@ -153,17 +154,30 @@ impl RocksDbStore {
/// ///
/// Warning: this function executes a full scan /// Warning: this function executes a full scan
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
self.db let default = self
.full_iterator_cf(self.spog_cf(), IteratorMode::Start) .db
.count() .full_iterator_cf(self.dspo_cf(), IteratorMode::Start)
.count();
let named = self
.db
.full_iterator_cf(self.gspo_cf(), IteratorMode::Start)
.count();
default + named
} }
/// Returns if the store is empty /// Returns if the store is empty
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
self.db let default = self
.full_iterator_cf(self.spog_cf(), IteratorMode::Start) .db
.full_iterator_cf(self.dspo_cf(), IteratorMode::Start)
.next() .next()
.is_none() .is_none();
let named = self
.db
.full_iterator_cf(self.gspo_cf(), IteratorMode::Start)
.next()
.is_none();
default && named
} }
/// Executes an ACID transaction. /// Executes an ACID transaction.
@ -314,6 +328,18 @@ impl RocksDbStore {
get_cf(&self.db, GOSP_CF) get_cf(&self.db, GOSP_CF)
} }
fn dspo_cf(&self) -> &ColumnFamily {
get_cf(&self.db, DSPO_CF)
}
fn dpos_cf(&self) -> &ColumnFamily {
get_cf(&self.db, DPOS_CF)
}
fn dosp_cf(&self) -> &ColumnFamily {
get_cf(&self.db, DOSP_CF)
}
fn auto_batch_writer(&self) -> AutoBatchWriter<'_> { fn auto_batch_writer(&self) -> AutoBatchWriter<'_> {
AutoBatchWriter { AutoBatchWriter {
store: self, store: self,
@ -324,28 +350,46 @@ impl RocksDbStore {
fn contains_encoded(&self, quad: &EncodedQuad) -> Result<bool, io::Error> { fn contains_encoded(&self, quad: &EncodedQuad) -> Result<bool, io::Error> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
write_spog_quad(&mut buffer, quad); if quad.graph_name.is_default_graph() {
write_spo_quad(&mut buffer, quad);
Ok(self Ok(self
.db .db
.get_pinned_cf(self.spog_cf(), &buffer) .get_pinned_cf(self.dspo_cf(), &buffer)
.map_err(map_err)? .map_err(map_err)?
.is_some()) .is_some())
} else {
write_gspo_quad(&mut buffer, quad);
Ok(self
.db
.get_pinned_cf(self.gspo_cf(), &buffer)
.map_err(map_err)?
.is_some())
}
} }
fn quads(&self) -> DecodingIndexIterator { fn quads(&self) -> DecodingIndexesIterator {
self.spog_quads(Vec::default()) DecodingIndexesIterator::pair(
self.dspo_quads(Vec::default()),
self.gspo_quads(Vec::default()),
)
} }
fn quads_for_subject(&self, subject: EncodedTerm) -> DecodingIndexIterator { fn quads_for_subject(&self, subject: EncodedTerm) -> DecodingIndexesIterator {
self.spog_quads(encode_term(subject)) DecodingIndexesIterator::pair(
self.dspo_quads(encode_term(subject)),
self.spog_quads(encode_term(subject)),
)
} }
fn quads_for_subject_predicate( fn quads_for_subject_predicate(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> DecodingIndexIterator { ) -> DecodingIndexesIterator {
self.spog_quads(encode_term_pair(subject, predicate)) DecodingIndexesIterator::pair(
self.dspo_quads(encode_term_pair(subject, predicate)),
self.spog_quads(encode_term_pair(subject, predicate)),
)
} }
fn quads_for_subject_predicate_object( fn quads_for_subject_predicate_object(
@ -353,44 +397,67 @@ impl RocksDbStore {
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> DecodingIndexIterator { ) -> DecodingIndexesIterator {
self.spog_quads(encode_term_triple(subject, predicate, object)) DecodingIndexesIterator::pair(
self.dspo_quads(encode_term_triple(subject, predicate, object)),
self.spog_quads(encode_term_triple(subject, predicate, object)),
)
} }
fn quads_for_subject_object( fn quads_for_subject_object(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> DecodingIndexIterator { ) -> DecodingIndexesIterator {
self.ospg_quads(encode_term_pair(object, subject)) DecodingIndexesIterator::pair(
self.dosp_quads(encode_term_pair(object, subject)),
self.ospg_quads(encode_term_pair(object, subject)),
)
} }
fn quads_for_predicate(&self, predicate: EncodedTerm) -> DecodingIndexIterator { fn quads_for_predicate(&self, predicate: EncodedTerm) -> DecodingIndexesIterator {
self.posg_quads(encode_term(predicate)) DecodingIndexesIterator::pair(
self.dpos_quads(encode_term(predicate)),
self.posg_quads(encode_term(predicate)),
)
} }
fn quads_for_predicate_object( fn quads_for_predicate_object(
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> DecodingIndexIterator { ) -> DecodingIndexesIterator {
self.posg_quads(encode_term_pair(predicate, object)) DecodingIndexesIterator::pair(
self.dpos_quads(encode_term_pair(predicate, object)),
self.posg_quads(encode_term_pair(predicate, object)),
)
} }
fn quads_for_object(&self, object: EncodedTerm) -> DecodingIndexIterator { fn quads_for_object(&self, object: EncodedTerm) -> DecodingIndexesIterator {
self.ospg_quads(encode_term(object)) DecodingIndexesIterator::pair(
self.dosp_quads(encode_term(object)),
self.ospg_quads(encode_term(object)),
)
} }
fn quads_for_graph(&self, graph_name: EncodedTerm) -> DecodingIndexIterator { fn quads_for_graph(&self, graph_name: EncodedTerm) -> DecodingIndexesIterator {
DecodingIndexesIterator::new(if graph_name.is_default_graph() {
self.dspo_quads(Vec::default())
} else {
self.gspo_quads(encode_term(graph_name)) self.gspo_quads(encode_term(graph_name))
})
} }
fn quads_for_subject_graph( fn quads_for_subject_graph(
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator { ) -> DecodingIndexesIterator {
DecodingIndexesIterator::new(if graph_name.is_default_graph() {
self.dspo_quads(encode_term(subject))
} else {
self.gspo_quads(encode_term_pair(graph_name, subject)) self.gspo_quads(encode_term_pair(graph_name, subject))
})
} }
fn quads_for_subject_predicate_graph( fn quads_for_subject_predicate_graph(
@ -398,8 +465,26 @@ impl RocksDbStore {
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator { ) -> DecodingIndexesIterator {
DecodingIndexesIterator::new(if graph_name.is_default_graph() {
self.dspo_quads(encode_term_pair(subject, predicate))
} else {
self.gspo_quads(encode_term_triple(graph_name, subject, predicate)) self.gspo_quads(encode_term_triple(graph_name, subject, predicate))
})
}
fn quads_for_subject_predicate_object_graph(
&self,
subject: EncodedTerm,
predicate: EncodedTerm,
object: EncodedTerm,
graph_name: EncodedTerm,
) -> DecodingIndexesIterator {
DecodingIndexesIterator::new(if graph_name.is_default_graph() {
self.dspo_quads(encode_term_triple(subject, predicate, object))
} else {
self.gspo_quads(encode_term_quad(graph_name, subject, predicate, object))
})
} }
fn quads_for_subject_object_graph( fn quads_for_subject_object_graph(
@ -407,16 +492,24 @@ impl RocksDbStore {
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator { ) -> DecodingIndexesIterator {
DecodingIndexesIterator::new(if graph_name.is_default_graph() {
self.dosp_quads(encode_term_pair(object, subject))
} else {
self.gosp_quads(encode_term_triple(graph_name, object, subject)) self.gosp_quads(encode_term_triple(graph_name, object, subject))
})
} }
fn quads_for_predicate_graph( fn quads_for_predicate_graph(
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator { ) -> DecodingIndexesIterator {
DecodingIndexesIterator::new(if graph_name.is_default_graph() {
self.dpos_quads(encode_term(predicate))
} else {
self.gpos_quads(encode_term_pair(graph_name, predicate)) self.gpos_quads(encode_term_pair(graph_name, predicate))
})
} }
fn quads_for_predicate_object_graph( fn quads_for_predicate_object_graph(
@ -424,16 +517,24 @@ impl RocksDbStore {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator { ) -> DecodingIndexesIterator {
DecodingIndexesIterator::new(if graph_name.is_default_graph() {
self.dpos_quads(encode_term_pair(predicate, object))
} else {
self.gpos_quads(encode_term_triple(graph_name, predicate, object)) self.gpos_quads(encode_term_triple(graph_name, predicate, object))
})
} }
fn quads_for_object_graph( fn quads_for_object_graph(
&self, &self,
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> DecodingIndexIterator { ) -> DecodingIndexesIterator {
DecodingIndexesIterator::new(if graph_name.is_default_graph() {
self.dosp_quads(encode_term(object))
} else {
self.gosp_quads(encode_term_pair(graph_name, object)) self.gosp_quads(encode_term_pair(graph_name, object))
})
} }
fn spog_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator { fn spog_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator {
@ -460,6 +561,18 @@ impl RocksDbStore {
self.inner_quads(self.gosp_cf(), prefix, QuadEncoding::GOSP) self.inner_quads(self.gosp_cf(), prefix, QuadEncoding::GOSP)
} }
fn dspo_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator {
self.inner_quads(self.dspo_cf(), prefix, QuadEncoding::DSPO)
}
fn dpos_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator {
self.inner_quads(self.dpos_cf(), prefix, QuadEncoding::DPOS)
}
fn dosp_quads(&self, prefix: Vec<u8>) -> DecodingIndexIterator {
self.inner_quads(self.dosp_cf(), prefix, QuadEncoding::DOSP)
}
#[allow(unsafe_code)] #[allow(unsafe_code)]
fn inner_quads( fn inner_quads(
&self, &self,
@ -519,7 +632,7 @@ impl StrLookup for RocksDbStore {
} }
impl ReadableEncodedStore for RocksDbStore { impl ReadableEncodedStore for RocksDbStore {
type QuadsIter = DecodingIndexIterator; type QuadsIter = DecodingIndexesIterator;
fn encoded_quads_for_pattern( fn encoded_quads_for_pattern(
&self, &self,
@ -527,13 +640,14 @@ impl ReadableEncodedStore for RocksDbStore {
predicate: Option<EncodedTerm>, predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>, object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>, graph_name: Option<EncodedTerm>,
) -> DecodingIndexIterator { ) -> DecodingIndexesIterator {
match subject { match subject {
Some(subject) => match predicate { Some(subject) => match predicate {
Some(predicate) => match object { Some(predicate) => match object {
Some(object) => match graph_name { Some(object) => match graph_name {
Some(graph_name) => self Some(graph_name) => self.quads_for_subject_predicate_object_graph(
.spog_quads(encode_term_quad(subject, predicate, object, graph_name)), subject, predicate, object, graph_name,
),
None => self.quads_for_subject_predicate_object(subject, predicate, object), None => self.quads_for_subject_predicate_object(subject, predicate, object),
}, },
None => match graph_name { None => match graph_name {
@ -633,6 +747,19 @@ impl StrContainer for AutoBatchWriter<'_> {
impl WritableEncodedStore for AutoBatchWriter<'_> { impl WritableEncodedStore for AutoBatchWriter<'_> {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.dspo_cf(), &self.buffer, &[]);
self.buffer.clear();
write_pos_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.dpos_cf(), &self.buffer, &[]);
self.buffer.clear();
write_osp_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.dosp_cf(), &self.buffer, &[]);
self.buffer.clear();
} else {
write_spog_quad(&mut self.buffer, quad); write_spog_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.spog_cf(), &self.buffer, &[]); self.batch.put_cf(self.store.spog_cf(), &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
@ -656,11 +783,25 @@ impl WritableEncodedStore for AutoBatchWriter<'_> {
write_gosp_quad(&mut self.buffer, quad); write_gosp_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.gosp_cf(), &self.buffer, &[]); self.batch.put_cf(self.store.gosp_cf(), &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
}
self.apply_if_big() self.apply_if_big()
} }
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> { fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.dspo_cf(), &self.buffer);
self.buffer.clear();
write_pos_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.dpos_cf(), &self.buffer);
self.buffer.clear();
write_osp_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.dosp_cf(), &self.buffer);
self.buffer.clear();
} else {
write_spog_quad(&mut self.buffer, quad); write_spog_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.spog_cf(), &self.buffer); self.batch.delete_cf(self.store.spog_cf(), &self.buffer);
self.buffer.clear(); self.buffer.clear();
@ -684,6 +825,7 @@ impl WritableEncodedStore for AutoBatchWriter<'_> {
write_gosp_quad(&mut self.buffer, quad); write_gosp_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.gosp_cf(), &self.buffer); self.batch.delete_cf(self.store.gosp_cf(), &self.buffer);
self.buffer.clear(); self.buffer.clear();
}
self.apply_if_big() self.apply_if_big()
} }
@ -813,6 +955,19 @@ impl StrContainer for RocksDbTransaction<'_> {
impl WritableEncodedStore for RocksDbTransaction<'_> { impl WritableEncodedStore for RocksDbTransaction<'_> {
fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> { fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.dspo_cf(), &self.buffer, &[]);
self.buffer.clear();
write_pos_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.dpos_cf(), &self.buffer, &[]);
self.buffer.clear();
write_osp_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.dosp_cf(), &self.buffer, &[]);
self.buffer.clear();
} else {
write_spog_quad(&mut self.buffer, quad); write_spog_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.spog_cf(), &self.buffer, &[]); self.batch.put_cf(self.store.spog_cf(), &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
@ -836,11 +991,25 @@ impl WritableEncodedStore for RocksDbTransaction<'_> {
write_gosp_quad(&mut self.buffer, quad); write_gosp_quad(&mut self.buffer, quad);
self.batch.put_cf(self.store.gosp_cf(), &self.buffer, &[]); self.batch.put_cf(self.store.gosp_cf(), &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
}
Ok(()) Ok(())
} }
fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> { fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<(), io::Error> {
if quad.graph_name.is_default_graph() {
write_spo_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.dspo_cf(), &self.buffer);
self.buffer.clear();
write_pos_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.dpos_cf(), &self.buffer);
self.buffer.clear();
write_osp_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.dosp_cf(), &self.buffer);
self.buffer.clear();
} else {
write_spog_quad(&mut self.buffer, quad); write_spog_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.spog_cf(), &self.buffer); self.batch.delete_cf(self.store.spog_cf(), &self.buffer);
self.buffer.clear(); self.buffer.clear();
@ -864,6 +1033,7 @@ impl WritableEncodedStore for RocksDbTransaction<'_> {
write_gosp_quad(&mut self.buffer, quad); write_gosp_quad(&mut self.buffer, quad);
self.batch.delete_cf(self.store.gosp_cf(), &self.buffer); self.batch.delete_cf(self.store.gosp_cf(), &self.buffer);
self.buffer.clear(); self.buffer.clear();
}
Ok(()) Ok(())
} }
@ -923,9 +1093,52 @@ impl StaticDBRowIterator {
_db: db, _db: db,
} }
} }
fn key(&self) -> Option<&[u8]> {
self.iter.key()
}
fn next(&mut self) {
self.iter.next()
}
}
pub(crate) struct DecodingIndexesIterator {
first: DecodingIndexIterator,
second: Option<DecodingIndexIterator>,
}
impl DecodingIndexesIterator {
fn new(first: DecodingIndexIterator) -> Self {
Self {
first,
second: None,
}
}
fn pair(first: DecodingIndexIterator, second: DecodingIndexIterator) -> Self {
Self {
first,
second: Some(second),
}
}
} }
pub(crate) struct DecodingIndexIterator { impl Iterator for DecodingIndexesIterator {
type Item = Result<EncodedQuad, io::Error>;
fn next(&mut self) -> Option<Result<EncodedQuad, io::Error>> {
if let Some(result) = self.first.next() {
Some(result)
} else if let Some(second) = self.second.as_mut() {
second.next()
} else {
None
}
}
}
struct DecodingIndexIterator {
iter: StaticDBRowIterator, iter: StaticDBRowIterator,
prefix: Vec<u8>, prefix: Vec<u8>,
encoding: QuadEncoding, encoding: QuadEncoding,
@ -935,10 +1148,10 @@ impl Iterator for DecodingIndexIterator {
type Item = Result<EncodedQuad, io::Error>; type Item = Result<EncodedQuad, io::Error>;
fn next(&mut self) -> Option<Result<EncodedQuad, io::Error>> { fn next(&mut self) -> Option<Result<EncodedQuad, io::Error>> {
if let Some(key) = self.iter.iter.key() { if let Some(key) = self.iter.key() {
if key.starts_with(&self.prefix) { if key.starts_with(&self.prefix) {
let result = self.encoding.decode(key); let result = self.encoding.decode(key);
self.iter.iter.next(); self.iter.next();
Some(result) Some(result)
} else { } else {
None None
@ -991,6 +1204,24 @@ fn write_gosp_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.predicate); write_term(sink, quad.predicate);
} }
fn write_spo_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
write_term(sink, quad.object);
}
fn write_pos_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.predicate);
write_term(sink, quad.object);
write_term(sink, quad.subject);
}
fn write_osp_quad(sink: &mut Vec<u8>, quad: &EncodedQuad) {
write_term(sink, quad.object);
write_term(sink, quad.subject);
write_term(sink, quad.predicate);
}
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
enum QuadEncoding { enum QuadEncoding {
SPOG, SPOG,
@ -999,6 +1230,9 @@ enum QuadEncoding {
GSPO, GSPO,
GPOS, GPOS,
GOSP, GOSP,
DSPO,
DPOS,
DOSP,
} }
impl QuadEncoding { impl QuadEncoding {
@ -1011,6 +1245,9 @@ impl QuadEncoding {
QuadEncoding::GSPO => cursor.read_gspo_quad(), QuadEncoding::GSPO => cursor.read_gspo_quad(),
QuadEncoding::GPOS => cursor.read_gpos_quad(), QuadEncoding::GPOS => cursor.read_gpos_quad(),
QuadEncoding::GOSP => cursor.read_gosp_quad(), QuadEncoding::GOSP => cursor.read_gosp_quad(),
QuadEncoding::DSPO => cursor.read_dspo_quad(),
QuadEncoding::DPOS => cursor.read_dpos_quad(),
QuadEncoding::DOSP => cursor.read_dosp_quad(),
} }
} }
} }
@ -1021,7 +1258,7 @@ fn map_err(e: Error) -> io::Error {
enum QuadsIter { enum QuadsIter {
Quads { Quads {
iter: DecodingIndexIterator, iter: DecodingIndexesIterator,
store: RocksDbStore, store: RocksDbStore,
}, },
Error(Once<io::Error>), Error(Once<io::Error>),
@ -1053,12 +1290,35 @@ fn store() -> Result<(), io::Error> {
let main_s = NamedOrBlankNode::from(BlankNode::default()); let main_s = NamedOrBlankNode::from(BlankNode::default());
let main_p = NamedNode::new("http://example.com").unwrap(); let main_p = NamedNode::new("http://example.com").unwrap();
let main_o = Term::from(Literal::from(1)); let main_o = Term::from(Literal::from(1));
let main_g = GraphName::from(BlankNode::default());
let main_quad = Quad::new(main_s.clone(), main_p.clone(), main_o.clone(), None);
let all_o = vec![ let default_quad = Quad::new(main_s.clone(), main_p.clone(), main_o.clone(), None);
let named_quad = Quad::new(
main_s.clone(),
main_p.clone(),
main_o.clone(),
main_g.clone(),
);
let default_quads = vec![
Quad::new(main_s.clone(), main_p.clone(), Literal::from(0), None), Quad::new(main_s.clone(), main_p.clone(), Literal::from(0), None),
Quad::new(main_s.clone(), main_p.clone(), main_o.clone(), None), default_quad.clone(),
Quad::new(main_s.clone(), main_p.clone(), Literal::from(2), None), Quad::new(
main_s.clone(),
main_p.clone(),
Literal::from(200000000),
None,
),
];
let all_quads = vec![
Quad::new(main_s.clone(), main_p.clone(), Literal::from(0), None),
default_quad.clone(),
Quad::new(
main_s.clone(),
main_p.clone(),
Literal::from(200000000),
None,
),
named_quad.clone(),
]; ];
let mut repo_path = temp_dir(); let mut repo_path = temp_dir();
@ -1066,29 +1326,27 @@ fn store() -> Result<(), io::Error> {
{ {
let store = RocksDbStore::open(&repo_path)?; let store = RocksDbStore::open(&repo_path)?;
store.insert(&main_quad)?; for t in &all_quads {
for t in &all_o {
store.insert(t)?; store.insert(t)?;
} }
let target = vec![main_quad];
assert_eq!( assert_eq!(
store store
.quads_for_pattern(None, None, None, None) .quads_for_pattern(None, None, None, None)
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
all_o all_quads
); );
assert_eq!( assert_eq!(
store store
.quads_for_pattern(Some(main_s.as_ref()), None, None, None) .quads_for_pattern(Some(main_s.as_ref()), None, None, None)
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
all_o all_quads
); );
assert_eq!( assert_eq!(
store store
.quads_for_pattern(Some(main_s.as_ref()), Some(main_p.as_ref()), None, None) .quads_for_pattern(Some(main_s.as_ref()), Some(main_p.as_ref()), None, None)
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
all_o all_quads
); );
assert_eq!( assert_eq!(
store store
@ -1099,7 +1357,7 @@ fn store() -> Result<(), io::Error> {
None None
) )
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
target vec![default_quad.clone(), named_quad.clone()]
); );
assert_eq!( assert_eq!(
store store
@ -1110,7 +1368,18 @@ fn store() -> Result<(), io::Error> {
Some(GraphNameRef::DefaultGraph) Some(GraphNameRef::DefaultGraph)
) )
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
target vec![default_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
Some(main_p.as_ref()),
Some(main_o.as_ref()),
Some(main_g.as_ref())
)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad.clone()]
); );
assert_eq!( assert_eq!(
store store
@ -1121,13 +1390,13 @@ fn store() -> Result<(), io::Error> {
Some(GraphNameRef::DefaultGraph) Some(GraphNameRef::DefaultGraph)
) )
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
all_o default_quads
); );
assert_eq!( assert_eq!(
store store
.quads_for_pattern(Some(main_s.as_ref()), None, Some(main_o.as_ref()), None) .quads_for_pattern(Some(main_s.as_ref()), None, Some(main_o.as_ref()), None)
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
target vec![default_quad.clone(), named_quad.clone()]
); );
assert_eq!( assert_eq!(
store store
@ -1138,7 +1407,18 @@ fn store() -> Result<(), io::Error> {
Some(GraphNameRef::DefaultGraph) Some(GraphNameRef::DefaultGraph)
) )
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
target vec![default_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
Some(main_s.as_ref()),
None,
Some(main_o.as_ref()),
Some(main_g.as_ref())
)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad.clone()]
); );
assert_eq!( assert_eq!(
store store
@ -1149,31 +1429,31 @@ fn store() -> Result<(), io::Error> {
Some(GraphNameRef::DefaultGraph) Some(GraphNameRef::DefaultGraph)
) )
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
all_o default_quads
); );
assert_eq!( assert_eq!(
store store
.quads_for_pattern(None, Some(main_p.as_ref()), None, None) .quads_for_pattern(None, Some(main_p.as_ref()), None, None)
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
all_o all_quads
); );
assert_eq!( assert_eq!(
store store
.quads_for_pattern(None, Some(main_p.as_ref()), Some(main_o.as_ref()), None) .quads_for_pattern(None, Some(main_p.as_ref()), Some(main_o.as_ref()), None)
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
target vec![default_quad.clone(), named_quad.clone()]
); );
assert_eq!( assert_eq!(
store store
.quads_for_pattern(None, None, Some(main_o.as_ref()), None) .quads_for_pattern(None, None, Some(main_o.as_ref()), None)
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
target vec![default_quad.clone(), named_quad.clone()]
); );
assert_eq!( assert_eq!(
store store
.quads_for_pattern(None, None, None, Some(GraphNameRef::DefaultGraph)) .quads_for_pattern(None, None, None, Some(GraphNameRef::DefaultGraph))
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
all_o default_quads
); );
assert_eq!( assert_eq!(
store store
@ -1184,7 +1464,18 @@ fn store() -> Result<(), io::Error> {
Some(GraphNameRef::DefaultGraph) Some(GraphNameRef::DefaultGraph)
) )
.collect::<Result<Vec<_>, _>>()?, .collect::<Result<Vec<_>, _>>()?,
target vec![default_quad.clone()]
);
assert_eq!(
store
.quads_for_pattern(
None,
Some(main_p.as_ref()),
Some(main_o.as_ref()),
Some(main_g.as_ref())
)
.collect::<Result<Vec<_>, _>>()?,
vec![named_quad.clone()]
); );
} }

Loading…
Cancel
Save