Upgrades RocksDB to 0.14

pull/35/head
Tpt 5 years ago
parent fe3b32063b
commit 2bbe29c31a
  1. 2
      lib/Cargo.toml
  2. 2
      lib/src/model/blank_node.rs
  3. 1
      lib/src/model/xsd/date_time.rs
  4. 1
      lib/src/model/xsd/parser.rs
  5. 111
      lib/src/store/rocksdb.rs

@ -13,7 +13,7 @@ edition = "2018"
[dependencies] [dependencies]
lazy_static = "1" lazy_static = "1"
rocksdb = { version = "0.13", optional = true } rocksdb = { version = "0.14", optional = true }
quick-xml = "0.18" quick-xml = "0.18"
rand = "0.7" rand = "0.7"
md-5 = "0.8" md-5 = "0.8"

@ -77,7 +77,7 @@ mod test {
#[test] #[test]
fn as_str_full() { fn as_str_full() {
let b = BlankNode::new_from_unique_id(0x77776666555544443333222211110000); let b = BlankNode::new_from_unique_id(0x7777_6666_5555_4444_3333_2222_1111_0000);
assert_eq!(b.as_str(), "77776666555544443333222211110000"); assert_eq!(b.as_str(), "77776666555544443333222211110000");
} }
} }

@ -1,6 +1,5 @@
use super::parser::{date_lexical_rep, date_time_lexical_rep, parse_value, time_lexical_rep}; use super::parser::{date_lexical_rep, date_time_lexical_rep, parse_value, time_lexical_rep};
use super::{Decimal, Duration, XsdParseError}; use super::{Decimal, Duration, XsdParseError};
use rand::distributions::weighted::alias_method::Weight;
use std::cmp::{min, Ordering}; use std::cmp::{min, Ordering};
use std::convert::{TryFrom, TryInto}; use std::convert::{TryFrom, TryInto};
use std::error::Error; use std::error::Error;

@ -14,7 +14,6 @@ use super::date_time::DateTimeError;
use super::decimal::ParseDecimalError; use super::decimal::ParseDecimalError;
use crate::model::xsd::date_time::TimezoneOffset; use crate::model::xsd::date_time::TimezoneOffset;
use nom::bytes::streaming::take_while_m_n; use nom::bytes::streaming::take_while_m_n;
use rand::distributions::weighted::alias_method::Weight;
use std::error::Error; use std::error::Error;
use std::fmt; use std::fmt;
use std::num::ParseIntError; use std::num::ParseIntError;

@ -249,7 +249,7 @@ impl<'a> StoreConnection for RocksDbStoreConnection<'a> {
}, },
None => match graph_name { None => match graph_name {
Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)), Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)),
None => wrap_error(self.quads()), None => Box::new(self.quads()),
}, },
}, },
}, },
@ -258,7 +258,7 @@ impl<'a> StoreConnection for RocksDbStoreConnection<'a> {
} }
impl<'a> RocksDbStoreConnection<'a> { impl<'a> RocksDbStoreConnection<'a> {
fn quads(&self) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { fn quads(&self) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
self.spog_quads(Vec::default()) self.spog_quads(Vec::default())
} }
@ -266,7 +266,7 @@ impl<'a> RocksDbStoreConnection<'a> {
&self, &self,
subject: EncodedTerm, subject: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.spog_quads(encode_term(subject)?) Ok(self.spog_quads(encode_term(subject)?))
} }
fn quads_for_subject_predicate( fn quads_for_subject_predicate(
@ -274,7 +274,7 @@ impl<'a> RocksDbStoreConnection<'a> {
subject: EncodedTerm, subject: EncodedTerm,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.spog_quads(encode_term_pair(subject, predicate)?) Ok(self.spog_quads(encode_term_pair(subject, predicate)?))
} }
fn quads_for_subject_predicate_object( fn quads_for_subject_predicate_object(
@ -283,7 +283,7 @@ impl<'a> RocksDbStoreConnection<'a> {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.spog_quads(encode_term_triple(subject, predicate, object)?) Ok(self.spog_quads(encode_term_triple(subject, predicate, object)?))
} }
fn quads_for_subject_object( fn quads_for_subject_object(
@ -291,14 +291,14 @@ impl<'a> RocksDbStoreConnection<'a> {
subject: EncodedTerm, subject: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.ospg_quads(encode_term_pair(object, subject)?) Ok(self.ospg_quads(encode_term_pair(object, subject)?))
} }
fn quads_for_predicate( fn quads_for_predicate(
&self, &self,
predicate: EncodedTerm, predicate: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.posg_quads(encode_term(predicate)?) Ok(self.posg_quads(encode_term(predicate)?))
} }
fn quads_for_predicate_object( fn quads_for_predicate_object(
@ -306,21 +306,21 @@ impl<'a> RocksDbStoreConnection<'a> {
predicate: EncodedTerm, predicate: EncodedTerm,
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.posg_quads(encode_term_pair(predicate, object)?) Ok(self.posg_quads(encode_term_pair(predicate, object)?))
} }
fn quads_for_object( fn quads_for_object(
&self, &self,
object: EncodedTerm, object: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.ospg_quads(encode_term(object)?) Ok(self.ospg_quads(encode_term(object)?))
} }
fn quads_for_graph( fn quads_for_graph(
&self, &self,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.gspo_quads(encode_term(graph_name)?) Ok(self.gspo_quads(encode_term(graph_name)?))
} }
fn quads_for_subject_graph( fn quads_for_subject_graph(
@ -328,7 +328,7 @@ impl<'a> RocksDbStoreConnection<'a> {
subject: EncodedTerm, subject: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.gspo_quads(encode_term_pair(graph_name, subject)?) Ok(self.gspo_quads(encode_term_pair(graph_name, subject)?))
} }
fn quads_for_subject_predicate_graph( fn quads_for_subject_predicate_graph(
@ -337,7 +337,7 @@ impl<'a> RocksDbStoreConnection<'a> {
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.gspo_quads(encode_term_triple(graph_name, subject, predicate)?) Ok(self.gspo_quads(encode_term_triple(graph_name, subject, predicate)?))
} }
fn quads_for_subject_object_graph( fn quads_for_subject_object_graph(
@ -346,7 +346,7 @@ impl<'a> RocksDbStoreConnection<'a> {
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.gosp_quads(encode_term_triple(graph_name, object, subject)?) Ok(self.gosp_quads(encode_term_triple(graph_name, object, subject)?))
} }
fn quads_for_predicate_graph( fn quads_for_predicate_graph(
@ -354,7 +354,7 @@ impl<'a> RocksDbStoreConnection<'a> {
predicate: EncodedTerm, predicate: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.gpos_quads(encode_term_pair(graph_name, predicate)?) Ok(self.gpos_quads(encode_term_pair(graph_name, predicate)?))
} }
fn quads_for_predicate_object_graph( fn quads_for_predicate_object_graph(
@ -363,7 +363,7 @@ impl<'a> RocksDbStoreConnection<'a> {
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.gpos_quads(encode_term_triple(graph_name, predicate, object)?) Ok(self.gpos_quads(encode_term_triple(graph_name, predicate, object)?))
} }
fn quads_for_object_graph( fn quads_for_object_graph(
@ -371,58 +371,40 @@ impl<'a> RocksDbStoreConnection<'a> {
object: EncodedTerm, object: EncodedTerm,
graph_name: EncodedTerm, graph_name: EncodedTerm,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.gosp_quads(encode_term_pair(graph_name, object)?) Ok(self.gosp_quads(encode_term_pair(graph_name, object)?))
} }
fn spog_quads( fn spog_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
&self,
prefix: Vec<u8>,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(self.spog_cf, prefix, |buffer| { self.inner_quads(self.spog_cf, prefix, |buffer| {
Cursor::new(buffer).read_spog_quad() Cursor::new(buffer).read_spog_quad()
}) })
} }
fn posg_quads( fn posg_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
&self,
prefix: Vec<u8>,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(self.posg_cf, prefix, |buffer| { self.inner_quads(self.posg_cf, prefix, |buffer| {
Cursor::new(buffer).read_posg_quad() Cursor::new(buffer).read_posg_quad()
}) })
} }
fn ospg_quads( fn ospg_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
&self,
prefix: Vec<u8>,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(self.ospg_cf, prefix, |buffer| { self.inner_quads(self.ospg_cf, prefix, |buffer| {
Cursor::new(buffer).read_ospg_quad() Cursor::new(buffer).read_ospg_quad()
}) })
} }
fn gspo_quads( fn gspo_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
&self,
prefix: Vec<u8>,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(self.gspo_cf, prefix, |buffer| { self.inner_quads(self.gspo_cf, prefix, |buffer| {
Cursor::new(buffer).read_gspo_quad() Cursor::new(buffer).read_gspo_quad()
}) })
} }
fn gpos_quads( fn gpos_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
&self,
prefix: Vec<u8>,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(self.gpos_cf, prefix, |buffer| { self.inner_quads(self.gpos_cf, prefix, |buffer| {
Cursor::new(buffer).read_gpos_quad() Cursor::new(buffer).read_gpos_quad()
}) })
} }
fn gosp_quads( fn gosp_quads(&self, prefix: Vec<u8>) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
&self,
prefix: Vec<u8>,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
self.inner_quads(self.gosp_cf, prefix, |buffer| { self.inner_quads(self.gosp_cf, prefix, |buffer| {
Cursor::new(buffer).read_gosp_quad() Cursor::new(buffer).read_gosp_quad()
}) })
@ -433,14 +415,14 @@ impl<'a> RocksDbStoreConnection<'a> {
cf: &ColumnFamily, cf: &ColumnFamily,
prefix: Vec<u8>, prefix: Vec<u8>,
decode: impl Fn(&[u8]) -> Result<EncodedQuad> + 'a, decode: impl Fn(&[u8]) -> Result<EncodedQuad> + 'a,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> impl Iterator<Item = Result<EncodedQuad>> + 'a {
let mut iter = self.store.db.raw_iterator_cf(cf)?; let mut iter = self.store.db.raw_iterator_cf(cf);
iter.seek(&prefix); iter.seek(&prefix);
Ok(DecodingIndexIterator { DecodingIndexIterator {
iter, iter,
prefix, prefix,
decode, decode,
}) }
} }
} }
@ -450,7 +432,8 @@ pub struct RocksDbStoreTransaction<'a> {
impl StrContainer for RocksDbStoreTransaction<'_> { impl StrContainer for RocksDbStoreTransaction<'_> {
fn insert_str(&mut self, key: u128, value: &str) -> Result<()> { fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
self.inner.insert_str(key, value) self.inner.insert_str(key, value);
Ok(())
} }
} }
@ -474,7 +457,8 @@ pub struct RocksDbStoreAutoTransaction<'a> {
impl StrContainer for RocksDbStoreAutoTransaction<'_> { impl StrContainer for RocksDbStoreAutoTransaction<'_> {
fn insert_str(&mut self, key: u128, value: &str) -> Result<()> { fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
self.inner.insert_str(key, value) self.inner.insert_str(key, value);
Ok(())
} }
} }
@ -514,41 +498,40 @@ struct RocksDbStoreInnerTransaction<'a> {
} }
impl RocksDbStoreInnerTransaction<'_> { impl RocksDbStoreInnerTransaction<'_> {
fn insert_str(&mut self, key: u128, value: &str) -> Result<()> { fn insert_str(&mut self, key: u128, value: &str) {
self.batch self.batch
.put_cf(self.connection.id2str_cf, &key.to_le_bytes(), value)?; .put_cf(self.connection.id2str_cf, &key.to_le_bytes(), value)
Ok(())
} }
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
self.buffer.write_spog_quad(quad)?; self.buffer.write_spog_quad(quad)?;
self.batch self.batch
.put_cf(self.connection.spog_cf, &self.buffer, &[])?; .put_cf(self.connection.spog_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_posg_quad(quad)?; self.buffer.write_posg_quad(quad)?;
self.batch self.batch
.put_cf(self.connection.posg_cf, &self.buffer, &[])?; .put_cf(self.connection.posg_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_ospg_quad(quad)?; self.buffer.write_ospg_quad(quad)?;
self.batch self.batch
.put_cf(self.connection.ospg_cf, &self.buffer, &[])?; .put_cf(self.connection.ospg_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gspo_quad(quad)?; self.buffer.write_gspo_quad(quad)?;
self.batch self.batch
.put_cf(self.connection.gspo_cf, &self.buffer, &[])?; .put_cf(self.connection.gspo_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gpos_quad(quad)?; self.buffer.write_gpos_quad(quad)?;
self.batch self.batch
.put_cf(self.connection.gpos_cf, &self.buffer, &[])?; .put_cf(self.connection.gpos_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gosp_quad(quad)?; self.buffer.write_gosp_quad(quad)?;
self.batch self.batch
.put_cf(self.connection.gosp_cf, &self.buffer, &[])?; .put_cf(self.connection.gosp_cf, &self.buffer, &[]);
self.buffer.clear(); self.buffer.clear();
Ok(()) Ok(())
@ -556,33 +539,27 @@ impl RocksDbStoreInnerTransaction<'_> {
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { fn remove(&mut self, quad: &EncodedQuad) -> Result<()> {
self.buffer.write_spog_quad(quad)?; self.buffer.write_spog_quad(quad)?;
self.batch self.batch.delete_cf(self.connection.spog_cf, &self.buffer);
.delete_cf(self.connection.spog_cf, &self.buffer)?;
self.buffer.clear(); self.buffer.clear();
self.buffer.write_posg_quad(quad)?; self.buffer.write_posg_quad(quad)?;
self.batch self.batch.delete_cf(self.connection.posg_cf, &self.buffer);
.delete_cf(self.connection.posg_cf, &self.buffer)?;
self.buffer.clear(); self.buffer.clear();
self.buffer.write_ospg_quad(quad)?; self.buffer.write_ospg_quad(quad)?;
self.batch self.batch.delete_cf(self.connection.ospg_cf, &self.buffer);
.delete_cf(self.connection.ospg_cf, &self.buffer)?;
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gspo_quad(quad)?; self.buffer.write_gspo_quad(quad)?;
self.batch self.batch.delete_cf(self.connection.gspo_cf, &self.buffer);
.delete_cf(self.connection.gspo_cf, &self.buffer)?;
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gpos_quad(quad)?; self.buffer.write_gpos_quad(quad)?;
self.batch self.batch.delete_cf(self.connection.gpos_cf, &self.buffer);
.delete_cf(self.connection.gpos_cf, &self.buffer)?;
self.buffer.clear(); self.buffer.clear();
self.buffer.write_gosp_quad(quad)?; self.buffer.write_gosp_quad(quad)?;
self.batch self.batch.delete_cf(self.connection.gosp_cf, &self.buffer);
.delete_cf(self.connection.gosp_cf, &self.buffer)?;
self.buffer.clear(); self.buffer.clear();
Ok(()) Ok(())

Loading…
Cancel
Save