Adds basic RocksDB store

pull/10/head
Tpt 6 years ago
parent 28cb7b276c
commit 49eda33d0a
  1. 4
      .travis.yml
  2. 4
      Cargo.toml
  3. 8
      src/errors.rs
  4. 4
      src/lib.rs
  5. 6
      src/model/blank_node.rs
  6. 2
      src/store/mod.rs
  7. 386
      src/store/numeric_encoder.rs
  8. 446
      src/store/rocksdb.rs
  9. 8
      src/utils.rs

@ -8,3 +8,7 @@ matrix:
- rust: nightly
fast_finish: true
cache: cargo
addons:
apt:
packages:
- clang

@ -15,8 +15,10 @@ build = "build.rs"
travis-ci = { repository = "Tpt/rudf" }
[dependencies]
error-chain = "0.12"
lazy_static = "1.0"
url = "1.7"
rocksdb = "0.10"
url = "1"
uuid = { version = "0.6", features = ["v4"] }
[build-dependencies]

@ -0,0 +1,8 @@
error_chain! {
foreign_links {
Url(::url::ParseError);
Uuid(::uuid::ParseError);
RocksDB(::rocksdb::Error);
Utf8(::std::str::Utf8Error);
}
}

@ -1,8 +1,12 @@
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate lazy_static;
extern crate rocksdb;
extern crate url;
extern crate uuid;
pub mod errors;
pub mod model;
pub mod rio;
pub mod sparql;

@ -28,3 +28,9 @@ impl Default for BlankNode {
BlankNode { id: Uuid::new_v4() }
}
}
impl From<Uuid> for BlankNode {
fn from(id: Uuid) -> Self {
Self { id }
}
}

@ -1,2 +1,4 @@
pub mod isomorphism;
pub mod memory;
mod numeric_encoder;
pub mod rocksdb;

@ -0,0 +1,386 @@
use errors::*;
use model::*;
use std::ops::Deref;
use std::str;
use std::str::FromStr;
use url::Url;
use uuid::Uuid;
pub const STRING_KEY_SIZE: usize = 8;
pub trait BytesStore {
type BytesOutput: Deref<Target = [u8]>;
fn put(&self, value: &[u8], id_buffer: &mut [u8]) -> Result<()>;
fn get(&self, id: &[u8]) -> Result<Option<Self::BytesOutput>>;
}
const TYPE_KEY_SIZE: usize = 1;
const TYPE_NAMED_NODE_ID: u8 = 1;
const TYPE_BLANK_NODE_ID: u8 = 2;
const TYPE_LANG_STRING_LITERAL_ID: u8 = 3;
const TYPE_TYPED_LITERAL_ID: u8 = 4;
pub const TERM_ENCODING_SIZE: usize = TYPE_KEY_SIZE + 2 * STRING_KEY_SIZE;
const EMPTY_TERM: [u8; TERM_ENCODING_SIZE] = [0 as u8; TERM_ENCODING_SIZE];
#[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash)]
pub struct EncodedTerm([u8; TERM_ENCODING_SIZE]);
impl EncodedTerm {
pub fn new_from_buffer(buffer: &[u8]) -> Result<Self> {
if buffer.len() != TERM_ENCODING_SIZE {
return Err("the term buffer has not the correct length".into());
}
let mut buf = [0 as u8; TERM_ENCODING_SIZE];
buf.copy_from_slice(buffer);
return Ok(EncodedTerm(buf));
}
}
impl AsRef<[u8]> for EncodedTerm {
fn as_ref(&self) -> &[u8] {
&self.0[..]
}
}
#[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Clone, Hash)]
pub struct EncodedQuad {
pub subject: EncodedTerm,
pub predicate: EncodedTerm,
pub object: EncodedTerm,
pub graph_name: EncodedTerm,
}
impl EncodedQuad {
pub fn new_from_spog_buffer(buffer: &[u8]) -> Result<Self> {
if buffer.len() != 4 * TERM_ENCODING_SIZE {
return Err("the spog buffer has not the correct length".into());
}
Ok(Self {
subject: EncodedTerm::new_from_buffer(&buffer[0..TERM_ENCODING_SIZE])?,
predicate: EncodedTerm::new_from_buffer(
&buffer[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE],
)?,
object: EncodedTerm::new_from_buffer(
&buffer[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE],
)?,
graph_name: EncodedTerm::new_from_buffer(
&buffer[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE],
)?,
})
}
pub fn new_from_posg_buffer(buffer: &[u8]) -> Result<Self> {
if buffer.len() != 4 * TERM_ENCODING_SIZE {
return Err("the posg buffer has not the correct length".into());
}
Ok(Self {
subject: EncodedTerm::new_from_buffer(
&buffer[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE],
)?,
predicate: EncodedTerm::new_from_buffer(&buffer[0..TERM_ENCODING_SIZE])?,
object: EncodedTerm::new_from_buffer(
&buffer[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE],
)?,
graph_name: EncodedTerm::new_from_buffer(
&buffer[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE],
)?,
})
}
pub fn new_from_ospg_buffer(buffer: &[u8]) -> Result<Self> {
if buffer.len() != 4 * TERM_ENCODING_SIZE {
return Err("the ospg buffer has not the correct length".into());
}
Ok(Self {
subject: EncodedTerm::new_from_buffer(
&buffer[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE],
)?,
predicate: EncodedTerm::new_from_buffer(
&buffer[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE],
)?,
object: EncodedTerm::new_from_buffer(&buffer[0..TERM_ENCODING_SIZE])?,
graph_name: EncodedTerm::new_from_buffer(
&buffer[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE],
)?,
})
}
pub fn spog(&self) -> [u8; 4 * TERM_ENCODING_SIZE] {
let mut spog = [0 as u8; 4 * TERM_ENCODING_SIZE];
spog[0..TERM_ENCODING_SIZE].copy_from_slice(self.subject.as_ref());
spog[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(self.predicate.as_ref());
spog[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE].copy_from_slice(self.object.as_ref());
spog[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE]
.copy_from_slice(self.graph_name.as_ref());
spog
}
pub fn posg(&self) -> [u8; 4 * TERM_ENCODING_SIZE] {
let mut posg = [0 as u8; 4 * TERM_ENCODING_SIZE];
posg[0..TERM_ENCODING_SIZE].copy_from_slice(self.predicate.as_ref());
posg[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(self.object.as_ref());
posg[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE].copy_from_slice(self.subject.as_ref());
posg[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE]
.copy_from_slice(self.graph_name.as_ref());
posg
}
pub fn ospg(&self) -> [u8; 4 * TERM_ENCODING_SIZE] {
let mut ospg = [0 as u8; 4 * TERM_ENCODING_SIZE];
ospg[0..TERM_ENCODING_SIZE].copy_from_slice(self.object.as_ref());
ospg[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(self.subject.as_ref());
ospg[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE]
.copy_from_slice(self.predicate.as_ref());
ospg[3 * TERM_ENCODING_SIZE..4 * TERM_ENCODING_SIZE]
.copy_from_slice(self.graph_name.as_ref());
ospg
}
}
pub struct Encoder<S: BytesStore> {
string_store: S,
}
impl<S: BytesStore> Encoder<S> {
pub fn new(string_store: S) -> Self {
Self { string_store }
}
pub fn encode_named_node(&self, named_node: &NamedNode) -> Result<EncodedTerm> {
let mut bytes = [0 as u8; TERM_ENCODING_SIZE];
bytes[0] = TYPE_NAMED_NODE_ID;
self.encode_str_value_to_lower_bytes(named_node.as_str(), &mut bytes)?;
Ok(EncodedTerm(bytes))
}
pub fn encode_blank_node(&self, blank_node: &BlankNode) -> Result<EncodedTerm> {
let mut bytes = [0 as u8; TERM_ENCODING_SIZE];
bytes[0] = TYPE_BLANK_NODE_ID;
bytes[TYPE_KEY_SIZE..TERM_ENCODING_SIZE].copy_from_slice(blank_node.as_bytes());
Ok(EncodedTerm(bytes))
}
pub fn encode_literal(&self, literal: &Literal) -> Result<EncodedTerm> {
let mut bytes = [0 as u8; TERM_ENCODING_SIZE];
if let Some(language) = literal.language() {
bytes[0] = TYPE_LANG_STRING_LITERAL_ID;
self.encode_str_value_to_upper_bytes(language, &mut bytes)?;
} else {
bytes[0] = TYPE_TYPED_LITERAL_ID;
self.encode_str_value_to_upper_bytes(literal.datatype().as_str(), &mut bytes)?;
}
self.encode_str_value_to_lower_bytes(literal.value().as_str(), &mut bytes)?;
Ok(EncodedTerm(bytes))
}
pub fn encode_named_or_blank_node(&self, term: &NamedOrBlankNode) -> Result<EncodedTerm> {
match term {
NamedOrBlankNode::NamedNode(named_node) => self.encode_named_node(named_node),
NamedOrBlankNode::BlankNode(blank_node) => self.encode_blank_node(blank_node),
}
}
pub fn encode_optional_named_or_blank_node(
&self,
term: &Option<NamedOrBlankNode>,
) -> Result<EncodedTerm> {
match term {
Some(node) => self.encode_named_or_blank_node(node),
None => Ok(EncodedTerm(EMPTY_TERM)),
}
}
pub fn encode_term(&self, term: &Term) -> Result<EncodedTerm> {
match term {
Term::NamedNode(named_node) => self.encode_named_node(named_node),
Term::BlankNode(blank_node) => self.encode_blank_node(blank_node),
Term::Literal(literal) => self.encode_literal(literal),
}
}
pub fn encode_quad(&self, quad: &Quad) -> Result<EncodedQuad> {
Ok(EncodedQuad {
subject: self.encode_named_or_blank_node(quad.subject())?,
predicate: self.encode_named_node(quad.predicate())?,
object: self.encode_term(quad.object())?,
graph_name: self.encode_optional_named_or_blank_node(quad.graph_name())?,
})
}
pub fn decode_term(&self, encoded: impl AsRef<[u8]>) -> Result<Term> {
let encoding = encoded.as_ref();
match encoding[0] {
TYPE_NAMED_NODE_ID => {
let iri = self.decode_url_value_from_lower_bytes(encoding)?;
Ok(NamedNode::from(iri).into())
}
TYPE_BLANK_NODE_ID => Ok(BlankNode::from(Uuid::from_bytes(&encoding[1..])?).into()),
TYPE_LANG_STRING_LITERAL_ID => {
let value = self.decode_str_value_from_lower_bytes(encoding)?;
let language = self.decode_str_value_from_upper_bytes(encoding)?;
Ok(Literal::new_language_tagged_literal(value, language).into())
}
TYPE_TYPED_LITERAL_ID => {
let value = self.decode_str_value_from_lower_bytes(encoding)?;
let datatype = NamedNode::from(self.decode_url_value_from_upper_bytes(encoding)?);
Ok(Literal::new_typed_literal(value, datatype).into())
}
_ => Err("invalid term type encoding".into()),
}
}
pub fn decode_named_or_blank_node(
&self,
encoded: impl AsRef<[u8]>,
) -> Result<NamedOrBlankNode> {
let encoding = encoded.as_ref();
match self.decode_term(encoding)? {
Term::NamedNode(named_node) => Ok(named_node.into()),
Term::BlankNode(blank_node) => Ok(blank_node.into()),
Term::Literal(_) => Err("A literal has ben found instead of a named node".into()),
}
}
pub fn decode_optional_named_or_blank_node(
&self,
encoded: impl AsRef<[u8]>,
) -> Result<Option<NamedOrBlankNode>> {
let encoding = encoded.as_ref();
if encoding == EMPTY_TERM {
Ok(None)
} else {
Ok(Some(self.decode_named_or_blank_node(encoding)?))
}
}
pub fn decode_named_node(&self, encoded: impl AsRef<[u8]>) -> Result<NamedNode> {
let encoding = encoded.as_ref();
match self.decode_term(encoding)? {
Term::NamedNode(named_node) => Ok(named_node),
Term::BlankNode(_) => Err("A blank node has been found instead of a named node".into()),
Term::Literal(_) => Err("A literal has ben found instead of a named node".into()),
}
}
pub fn decode_quad(&self, encoded: EncodedQuad) -> Result<Quad> {
Ok(Quad::new(
self.decode_named_or_blank_node(encoded.subject)?,
self.decode_named_node(encoded.predicate)?,
self.decode_term(encoded.object)?,
self.decode_optional_named_or_blank_node(encoded.graph_name)?,
))
}
fn encode_str_value_to_upper_bytes(&self, text: &str, bytes: &mut [u8]) -> Result<()> {
self.string_store.put(
text.as_bytes(),
&mut bytes[TYPE_KEY_SIZE..TYPE_KEY_SIZE + STRING_KEY_SIZE],
)
}
fn encode_str_value_to_lower_bytes(&self, text: &str, bytes: &mut [u8]) -> Result<()> {
self.string_store.put(
text.as_bytes(),
&mut bytes[TYPE_KEY_SIZE + STRING_KEY_SIZE..TYPE_KEY_SIZE + 2 * STRING_KEY_SIZE],
)
}
fn decode_str_value_from_upper_bytes(&self, encoding: &[u8]) -> Result<String> {
let bytes = self.decode_value_from_upper_bytes(encoding)?;
Ok(str::from_utf8(&bytes)?.to_string())
}
fn decode_url_value_from_upper_bytes(&self, encoding: &[u8]) -> Result<Url> {
let bytes = self.decode_value_from_upper_bytes(encoding)?;
Ok(Url::from_str(str::from_utf8(&bytes)?)?)
}
fn decode_value_from_upper_bytes(&self, encoding: &[u8]) -> Result<S::BytesOutput> {
self.string_store
.get(&encoding[TYPE_KEY_SIZE..TYPE_KEY_SIZE + STRING_KEY_SIZE])?
.ok_or(Error::from("value not found in the dictionary"))
}
fn decode_str_value_from_lower_bytes(&self, encoding: &[u8]) -> Result<String> {
let bytes = self.decode_value_from_lower_bytes(encoding)?;
Ok(str::from_utf8(&bytes)?.to_string())
}
fn decode_url_value_from_lower_bytes(&self, encoding: &[u8]) -> Result<Url> {
let bytes = self.decode_value_from_lower_bytes(encoding)?;
Ok(Url::from_str(str::from_utf8(&bytes)?)?)
}
fn decode_value_from_lower_bytes(&self, encoding: &[u8]) -> Result<S::BytesOutput> {
self.string_store
.get(&encoding[TYPE_KEY_SIZE + STRING_KEY_SIZE..TYPE_KEY_SIZE + 2 * STRING_KEY_SIZE])?
.ok_or(Error::from("value not found in the dictionary"))
}
}
impl<S: BytesStore + Default> Default for Encoder<S> {
fn default() -> Self {
Self {
string_store: S::default(),
}
}
}
mod test {
use errors::*;
use model::*;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::str::FromStr;
use store::numeric_encoder::BytesStore;
use store::numeric_encoder::Encoder;
use store::numeric_encoder::STRING_KEY_SIZE;
use store::numeric_encoder::TERM_ENCODING_SIZE;
use utils::to_bytes;
#[derive(Default)]
struct MemoryBytesStore {
id2str: RefCell<BTreeMap<[u8; STRING_KEY_SIZE], Vec<u8>>>,
str2id: RefCell<BTreeMap<Vec<u8>, [u8; STRING_KEY_SIZE]>>,
}
impl BytesStore for MemoryBytesStore {
type BytesOutput = Vec<u8>;
fn put(&self, value: &[u8], id_buffer: &mut [u8]) -> Result<()> {
let mut str2id = self.str2id.borrow_mut();
let mut id2str = self.id2str.borrow_mut();
let id = str2id.entry(value.to_vec()).or_insert_with(|| {
let id = to_bytes(id2str.len());
id2str.insert(id, value.to_vec());
id
});
id_buffer.copy_from_slice(id);
Ok(())
}
fn get(&self, id: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(self.id2str.borrow().get(id).map(|s| s.to_owned()))
}
}
#[test]
fn test_encoding() {
let encoder: Encoder<MemoryBytesStore> = Encoder::default();
let terms: Vec<Term> = vec![
NamedNode::from_str("http://foo.com").unwrap().into(),
NamedNode::from_str("http://bar.com").unwrap().into(),
NamedNode::from_str("http://foo.com").unwrap().into(),
BlankNode::default().into(),
Literal::from(true).into(),
Literal::from(1.2).into(),
Literal::from("foo").into(),
Literal::new_language_tagged_literal("foo", "fr").into(),
];
for term in terms {
let encoded = encoder.encode_term(&term).unwrap();
assert_eq!(term, encoder.decode_term(encoded).unwrap())
}
}
}

@ -0,0 +1,446 @@
use errors::*;
use model::*;
use rocksdb::ColumnFamily;
use rocksdb::DBRawIterator;
use rocksdb::DBVector;
use rocksdb::IteratorMode;
use rocksdb::Options;
use rocksdb::WriteBatch;
use rocksdb::DB;
use std::ops::Deref;
use std::path::Path;
use std::slice;
use std::str;
use store::numeric_encoder::BytesStore;
use store::numeric_encoder::EncodedQuad;
use store::numeric_encoder::EncodedTerm;
use store::numeric_encoder::Encoder;
use store::numeric_encoder::STRING_KEY_SIZE;
use store::numeric_encoder::TERM_ENCODING_SIZE;
use utils::to_bytes;
pub struct RocksDbDataset {
store: RocksDbStore,
}
impl RocksDbDataset {
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {
store: RocksDbStore::open(path)?,
})
}
fn graph(&self, name: &NamedOrBlankNode) -> RocksDbGraph {
RocksDbGraph {
store: &self.store,
name: name.clone(),
}
}
fn default_graph(&self) -> RocksDbDefaultGraph {
RocksDbDefaultGraph { store: &self.store }
}
fn union_graph(&self) -> RocksDbUnionGraph {
RocksDbUnionGraph { store: &self.store }
}
fn iter(&self) -> Result<QuadsIterator<SPOGIndexIterator>> {
Ok(QuadsIterator {
iter: self.store.quads()?,
encoder: self.store.encoder(),
})
}
fn quads_for_subject(
&self,
subject: &NamedOrBlankNode,
) -> Result<QuadsIterator<FilteringEncodedQuadsIterator<SPOGIndexIterator>>> {
Ok(QuadsIterator {
iter: self.store
.quads_for_subject(self.store.encoder().encode_named_or_blank_node(subject)?)?,
encoder: self.store.encoder(),
})
}
fn contains(&self, quad: &Quad) -> Result<bool> {
self.store
.contains(&self.store.encoder().encode_quad(quad)?)
}
fn insert(&self, quad: &Quad) -> Result<()> {
self.store.insert(&self.store.encoder().encode_quad(quad)?)
}
fn remove(&self, quad: &Quad) -> Result<()> {
self.store.remove(&self.store.encoder().encode_quad(quad)?)
}
}
struct RocksDbGraph<'a> {
store: &'a RocksDbStore,
name: NamedOrBlankNode, //TODO: better storage
}
struct RocksDbDefaultGraph<'a> {
store: &'a RocksDbStore,
}
struct RocksDbUnionGraph<'a> {
store: &'a RocksDbStore,
}
const ID2STR_CF: &'static str = "id2str";
const STR2ID_CF: &'static str = "id2str";
const SPOG_CF: &'static str = "spog";
const POSG_CF: &'static str = "posg";
const OSPG_CF: &'static str = "ospg";
const EMPTY_BUF: [u8; 0] = [0 as u8; 0];
//TODO: indexes for the default graph and indexes for the named graphs (no more Optional and space saving)
const COLUMN_FAMILIES: [&'static str; 5] = [ID2STR_CF, STR2ID_CF, SPOG_CF, POSG_CF, OSPG_CF];
struct RocksDbStore {
db: DB,
id2str_cf: ColumnFamily,
str2id_cf: ColumnFamily,
spog_cf: ColumnFamily,
posg_cf: ColumnFamily,
ospg_cf: ColumnFamily,
}
impl RocksDbStore {
fn open(path: impl AsRef<Path>) -> Result<Self> {
let options = Options::default();
let db = DB::open_cf(&options, path, &COLUMN_FAMILIES)?;
let id2str_cf = get_cf(&db, STR2ID_CF)?;
let str2id_cf = get_cf(&db, ID2STR_CF)?;
let spog_cf = get_cf(&db, SPOG_CF)?;
let posg_cf = get_cf(&db, POSG_CF)?;
let ospg_cf = get_cf(&db, OSPG_CF)?;
Ok(Self {
db,
id2str_cf,
str2id_cf,
spog_cf,
posg_cf,
ospg_cf,
})
}
fn encoder(&self) -> Encoder<RocksDbBytesStore> {
Encoder::new(RocksDbBytesStore(&self))
}
fn quads(&self) -> Result<SPOGIndexIterator> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek_to_first();
Ok(SPOGIndexIterator { iter })
}
fn quads_for_subject(
&self,
subject: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(subject.as_ref());
Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter },
filter: EncodedQuadPattern::new(Some(subject), None, None, None),
})
}
fn quads_for_subject_predicate(
&self,
subject: EncodedTerm,
predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_pair(&subject, &predicate));
Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter },
filter: EncodedQuadPattern::new(Some(subject), Some(predicate), None, None),
})
}
fn quads_for_subject_predicate_object(
&self,
subject: EncodedTerm,
predicate: EncodedTerm,
object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<SPOGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_triple(&subject, &predicate, &object));
Ok(FilteringEncodedQuadsIterator {
iter: SPOGIndexIterator { iter },
filter: EncodedQuadPattern::new(Some(subject), Some(predicate), Some(object), None),
})
}
fn quads_for_predicate(
&self,
predicate: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.posg_cf)?;
iter.seek(predicate.as_ref());
Ok(FilteringEncodedQuadsIterator {
iter: POSGIndexIterator { iter },
filter: EncodedQuadPattern::new(None, Some(predicate), None, None),
})
}
fn quads_for_predicate_object(
&self,
predicate: EncodedTerm,
object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<POSGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.spog_cf)?;
iter.seek(&encode_term_pair(&predicate, &object));
Ok(FilteringEncodedQuadsIterator {
iter: POSGIndexIterator { iter },
filter: EncodedQuadPattern::new(None, Some(predicate), Some(object), None),
})
}
fn quads_for_object(
&self,
object: EncodedTerm,
) -> Result<FilteringEncodedQuadsIterator<OSPGIndexIterator>> {
let mut iter = self.db.raw_iterator_cf(self.ospg_cf)?;
iter.seek(object.as_ref());
Ok(FilteringEncodedQuadsIterator {
iter: OSPGIndexIterator { iter },
filter: EncodedQuadPattern::new(None, None, Some(object), None),
})
}
fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self.db.get_cf(self.spog_cf, &quad.spog())?.is_some())
}
fn insert(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
batch.put_cf(self.spog_cf, &quad.spog(), &EMPTY_BUF)?;
batch.put_cf(self.posg_cf, &quad.posg(), &EMPTY_BUF)?;
batch.put_cf(self.ospg_cf, &quad.ospg(), &EMPTY_BUF)?;
Ok(self.db.write(batch)?) //TODO: check what's going on if the key already exists
}
fn remove(&self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
batch.delete_cf(self.spog_cf, &quad.spog())?;
batch.delete_cf(self.posg_cf, &quad.posg())?;
batch.delete_cf(self.ospg_cf, &quad.ospg())?;
Ok(self.db.write(batch)?)
}
}
fn get_cf(db: &DB, name: &str) -> Result<ColumnFamily> {
db.cf_handle(name)
.ok_or_else(|| Error::from("column family not found"))
}
struct RocksDbBytesStore<'a>(&'a RocksDbStore);
impl<'a> BytesStore for RocksDbBytesStore<'a> {
type BytesOutput = DBVector;
fn put(&self, value: &[u8], id_buffer: &mut [u8]) -> Result<()> {
match self.0.db.get_cf(self.0.str2id_cf, value)? {
Some(id) => id_buffer.copy_from_slice(&id),
None => {
let mut batch = WriteBatch::default();
// TODO: id allocation
let id = [0 as u8; STRING_KEY_SIZE];
batch.put_cf(self.0.id2str_cf, &id, value)?;
batch.put_cf(self.0.str2id_cf, value, &id)?;
self.0.db.write(batch)?;
id_buffer.copy_from_slice(&id)
}
}
Ok(())
}
fn get(&self, id: &[u8]) -> Result<Option<DBVector>> {
Ok(self.0.db.get_cf(self.0.id2str_cf, id)?)
}
}
struct EncodedQuadPattern {
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
}
impl EncodedQuadPattern {
fn new(
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Self {
Self {
subject,
predicate,
object,
graph_name,
}
}
fn filter(&self, quad: &EncodedQuad) -> bool {
if let Some(ref subject) = self.subject {
if &quad.subject != subject {
return false;
}
}
if let Some(ref predicate) = self.predicate {
if &quad.predicate != predicate {
return false;
}
}
if let Some(ref object) = self.object {
if &quad.object != object {
return false;
}
}
if let Some(ref graph_name) = self.graph_name {
if &quad.graph_name != graph_name {
return false;
}
}
true
}
}
fn encode_term_pair(t1: &EncodedTerm, t2: &EncodedTerm) -> [u8; 2 * TERM_ENCODING_SIZE] {
let mut bytes = [0 as u8; 2 * TERM_ENCODING_SIZE];
bytes[0..TERM_ENCODING_SIZE].copy_from_slice(t1.as_ref());
bytes[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(t2.as_ref());
bytes
}
fn encode_term_triple(
t1: &EncodedTerm,
t2: &EncodedTerm,
t3: &EncodedTerm,
) -> [u8; 3 * TERM_ENCODING_SIZE] {
let mut bytes = [0 as u8; 3 * TERM_ENCODING_SIZE];
bytes[0..TERM_ENCODING_SIZE].copy_from_slice(t1.as_ref());
bytes[TERM_ENCODING_SIZE..2 * TERM_ENCODING_SIZE].copy_from_slice(t2.as_ref());
bytes[2 * TERM_ENCODING_SIZE..3 * TERM_ENCODING_SIZE].copy_from_slice(t2.as_ref());
bytes
}
struct SPOGIndexIterator {
iter: DBRawIterator,
}
impl Iterator for SPOGIndexIterator {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
self.iter
.key()
.map(|buffer| EncodedQuad::new_from_spog_buffer(&buffer))
}
}
struct POSGIndexIterator {
iter: DBRawIterator,
}
impl Iterator for POSGIndexIterator {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
self.iter
.key()
.map(|buffer| EncodedQuad::new_from_posg_buffer(&buffer))
}
}
struct OSPGIndexIterator {
iter: DBRawIterator,
}
impl Iterator for OSPGIndexIterator {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next();
self.iter
.key()
.map(|buffer| EncodedQuad::new_from_ospg_buffer(&buffer))
}
}
struct FilteringEncodedQuadsIterator<I: Iterator<Item = Result<EncodedQuad>>> {
iter: I,
filter: EncodedQuadPattern,
}
impl<I: Iterator<Item = Result<EncodedQuad>>> Iterator for FilteringEncodedQuadsIterator<I> {
type Item = Result<EncodedQuad>;
fn next(&mut self) -> Option<Result<EncodedQuad>> {
self.iter.next().filter(|quad| match quad {
Ok(quad) => self.filter.filter(quad),
Err(e) => true,
})
}
}
struct QuadsIterator<'a, I: Iterator<Item = Result<EncodedQuad>>> {
iter: I,
encoder: Encoder<RocksDbBytesStore<'a>>,
}
impl<'a, I: Iterator<Item = Result<EncodedQuad>>> Iterator for QuadsIterator<'a, I> {
type Item = Result<Quad>;
fn next(&mut self) -> Option<Result<Quad>> {
self.iter
.next()
.map(|k| k.and_then(|quad| self.encoder.decode_quad(quad)))
}
}
/*fn encode_sp(
encoder: &Encoder<RocksDbBytesStore>,
subject: &NamedOrBlankNode,
predicate: &NamedNode,
) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> {
let mut sp = [0 as u8; 2 * TERM_ENCODING_SIZE];
encoder.encode_named_or_blank_node(subject, &mut sp)?;
encoder.encode_named_node(predicate, &mut sp)?;
Ok(sp)
}
fn encode_po(
encoder: &Encoder<RocksDbBytesStore>,
predicate: &NamedNode,
object: &Term,
) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> {
let mut po = [0 as u8; 2 * TERM_ENCODING_SIZE];
encoder.encode_named_node(predicate, &mut po)?;
encoder.encode_term(object, &mut po)?;
Ok(po)
}
fn encode_os(
encoder: &Encoder<RocksDbBytesStore>,
object: &Term,
subject: &NamedOrBlankNode,
) -> Result<[u8; 2 * TERM_ENCODING_SIZE]> {
let mut po = [0 as u8; 2 * TERM_ENCODING_SIZE];
encoder.encode_term(object, &mut po)?;
encoder.encode_named_or_blank_node(subject, &mut po)?;
Ok(po)
}*/

@ -1,3 +1,6 @@
use std::mem::size_of;
use std::mem::transmute;
pub trait Escaper {
fn escape(&self) -> String;
}
@ -77,3 +80,8 @@ impl ExactSizeIterator for EscapeRDF {
}
}
}
pub fn to_bytes(int: usize) -> [u8; size_of::<usize>()] {
//TODO: remove when next rust version stabilize this method
unsafe { transmute(int) }
}

Loading…
Cancel
Save