Migrates to RocksDB 0.13

pull/22/head
Tpt 5 years ago
parent 1cd0691520
commit b369eb8126
  1. 2
      lib/Cargo.toml
  2. 53
      lib/src/sparql/eval.rs
  3. 15
      lib/src/sparql/plan.rs
  4. 2
      lib/src/store/memory.rs
  5. 9
      lib/src/store/numeric_encoder.rs
  6. 69
      lib/src/store/rocksdb.rs

@ -14,7 +14,7 @@ edition = "2018"
[dependencies] [dependencies]
lazy_static = "1" lazy_static = "1"
rocksdb = { version = "0.12", optional = true } rocksdb = { version = "0.13", optional = true }
byteorder = { version = "1", features = ["i128"] } byteorder = { version = "1", features = ["i128"] }
quick-xml = "0.17" quick-xml = "0.17"
ordered-float = "1" ordered-float = "1"

@ -29,7 +29,6 @@ use std::fmt::Write;
use std::hash::Hash; use std::hash::Hash;
use std::iter::Iterator; use std::iter::Iterator;
use std::iter::{empty, once}; use std::iter::{empty, once};
use std::ops::Deref;
use std::str; use std::str;
use std::sync::Mutex; use std::sync::Mutex;
@ -1451,10 +1450,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
} }
} }
fn to_simple_string( fn to_simple_string(&self, term: EncodedTerm) -> Option<String> {
&self,
term: EncodedTerm,
) -> Option<<DatasetView<S> as StrLookup>::StrType> {
if let EncodedTerm::StringLiteral { value_id } = term { if let EncodedTerm::StringLiteral { value_id } = term {
self.dataset.get_str(value_id).ok()? self.dataset.get_str(value_id).ok()?
} else { } else {
@ -1470,7 +1466,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
} }
} }
fn to_string(&self, term: EncodedTerm) -> Option<<DatasetView<S> as StrLookup>::StrType> { fn to_string(&self, term: EncodedTerm) -> Option<String> {
match term { match term {
EncodedTerm::StringLiteral { value_id } EncodedTerm::StringLiteral { value_id }
| EncodedTerm::LangStringLiteral { value_id, .. } => { | EncodedTerm::LangStringLiteral { value_id, .. } => {
@ -1480,10 +1476,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
} }
} }
fn to_string_and_language( fn to_string_and_language(&self, term: EncodedTerm) -> Option<(String, Option<u128>)> {
&self,
term: EncodedTerm,
) -> Option<(<DatasetView<S> as StrLookup>::StrType, Option<u128>)> {
match term { match term {
EncodedTerm::StringLiteral { value_id } => { EncodedTerm::StringLiteral { value_id } => {
Some((self.dataset.get_str(value_id).ok()??, None)) Some((self.dataset.get_str(value_id).ok()??, None))
@ -1533,11 +1526,7 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
&self, &self,
arg1: EncodedTerm, arg1: EncodedTerm,
arg2: EncodedTerm, arg2: EncodedTerm,
) -> Option<( ) -> Option<(String, String, Option<u128>)> {
<DatasetView<S> as StrLookup>::StrType,
<DatasetView<S> as StrLookup>::StrType,
Option<u128>,
)> {
let (value1, language1) = self.to_string_and_language(arg1)?; let (value1, language1) = self.to_string_and_language(arg1)?;
let (value2, language2) = self.to_string_and_language(arg2)?; let (value2, language2) = self.to_string_and_language(arg2)?;
if language2.is_none() || language1 == language2 { if language2.is_none() || language1 == language2 {
@ -1893,40 +1882,6 @@ impl<'a, S: StoreConnection + 'a> SimpleEvaluator<S> {
} }
} }
pub enum StringOrStoreString<S: Deref<Target = str> + ToString + Into<String>> {
String(String),
Store(S),
}
impl<S: Deref<Target = str> + ToString + Into<String>> Deref for StringOrStoreString<S> {
type Target = str;
fn deref(&self) -> &str {
match self {
StringOrStoreString::String(s) => &*s,
StringOrStoreString::Store(s) => &*s,
}
}
}
impl<S: Deref<Target = str> + ToString + Into<String>> ToString for StringOrStoreString<S> {
fn to_string(&self) -> String {
match self {
StringOrStoreString::String(s) => s.to_string(),
StringOrStoreString::Store(s) => s.to_string(),
}
}
}
impl<S: Deref<Target = str> + ToString + Into<String>> From<StringOrStoreString<S>> for String {
fn from(string: StringOrStoreString<S>) -> Self {
match string {
StringOrStoreString::String(s) => s,
StringOrStoreString::Store(s) => s.into(),
}
}
}
enum NumericBinaryOperands { enum NumericBinaryOperands {
Float(f32, f32), Float(f32, f32),
Double(f64, f64), Double(f64, f64),

@ -1,4 +1,3 @@
use crate::sparql::eval::StringOrStoreString;
use crate::sparql::model::Variable; use crate::sparql::model::Variable;
use crate::sparql::GraphPattern; use crate::sparql::GraphPattern;
use crate::store::numeric_encoder::{ use crate::store::numeric_encoder::{
@ -493,16 +492,12 @@ impl<S: StoreConnection> DatasetView<S> {
} }
impl<S: StoreConnection> StrLookup for DatasetView<S> { impl<S: StoreConnection> StrLookup for DatasetView<S> {
type StrType = StringOrStoreString<S::StrType>; fn get_str(&self, id: u128) -> Result<Option<String>> {
if let Some(value) = self.extra.borrow().get_str(id)? {
fn get_str(&self, id: u128) -> Result<Option<StringOrStoreString<S::StrType>>> { Ok(Some(value))
Ok(if let Some(value) = self.extra.borrow().get_str(id)? {
Some(StringOrStoreString::String(value))
} else if let Some(value) = self.store.get_str(id)? {
Some(StringOrStoreString::Store(value))
} else { } else {
None self.store.get_str(id)
}) }
} }
} }

@ -77,8 +77,6 @@ impl<'a> Store for &'a MemoryStore {
} }
impl<'a> StrLookup for &'a MemoryStore { impl<'a> StrLookup for &'a MemoryStore {
type StrType = String;
fn get_str(&self, id: u128) -> Result<Option<String>> { fn get_str(&self, id: u128) -> Result<Option<String>> {
self.indexes()?.str_store.get_str(id) self.indexes()?.str_store.get_str(id)
} }

@ -16,7 +16,6 @@ use std::collections::HashMap;
use std::io::Read; use std::io::Read;
use std::io::Write; use std::io::Write;
use std::mem::size_of; use std::mem::size_of;
use std::ops::Deref;
use std::str; use std::str;
const EMPTY_STRING_ID: u128 = 0x7e42_f8ec_9809_80e9_04b2_008f_d98c_1dd4; const EMPTY_STRING_ID: u128 = 0x7e42_f8ec_9809_80e9_04b2_008f_d98c_1dd4;
@ -693,9 +692,7 @@ impl<W: Write> TermWriter for W {
} }
pub trait StrLookup { pub trait StrLookup {
type StrType: Deref<Target = str> + ToString + Into<String>; fn get_str(&self, id: u128) -> Result<Option<String>>;
fn get_str(&self, id: u128) -> Result<Option<Self::StrType>>;
} }
pub trait StrContainer { pub trait StrContainer {
@ -733,8 +730,6 @@ impl Default for MemoryStrStore {
} }
impl StrLookup for MemoryStrStore { impl StrLookup for MemoryStrStore {
type StrType = String;
fn get_str(&self, id: u128) -> Result<Option<String>> { fn get_str(&self, id: u128) -> Result<Option<String>> {
//TODO: avoid copy by adding a lifetime limit to get_str //TODO: avoid copy by adding a lifetime limit to get_str
Ok(self.id2str.get(&id).cloned()) Ok(self.id2str.get(&id).cloned())
@ -1121,7 +1116,7 @@ impl<S: StrLookup> Decoder for S {
} }
} }
fn get_required_str<S: StrLookup>(lookup: &S, id: u128) -> Result<S::StrType> { fn get_required_str(lookup: &impl StrLookup, id: u128) -> Result<String> {
lookup.get_str(id)?.ok_or_else(|| { lookup.get_str(id)?.ok_or_else(|| {
format_err!( format_err!(
"Not able to find the string with id {} in the string store", "Not able to find the string with id {} in the string store",

@ -2,17 +2,10 @@ use crate::store::numeric_encoder::*;
use crate::store::{Store, StoreConnection, StoreRepositoryConnection, StoreTransaction}; use crate::store::{Store, StoreConnection, StoreRepositoryConnection, StoreTransaction};
use crate::{Repository, Result}; use crate::{Repository, Result};
use failure::format_err; use failure::format_err;
use rocksdb::ColumnFamily; use rocksdb::*;
use rocksdb::DBCompactionStyle;
use rocksdb::DBRawIterator;
use rocksdb::DBVector;
use rocksdb::Options;
use rocksdb::WriteBatch;
use rocksdb::DB;
use std::io::Cursor; use std::io::Cursor;
use std::iter::{empty, once}; use std::iter::{empty, once};
use std::mem::swap; use std::mem::swap;
use std::ops::Deref;
use std::path::Path; use std::path::Path;
use std::str; use std::str;
@ -77,13 +70,13 @@ struct RocksDbStore {
#[derive(Clone)] #[derive(Clone)]
pub struct RocksDbStoreConnection<'a> { pub struct RocksDbStoreConnection<'a> {
store: &'a RocksDbStore, store: &'a RocksDbStore,
id2str_cf: ColumnFamily<'a>, id2str_cf: &'a ColumnFamily,
spog_cf: ColumnFamily<'a>, spog_cf: &'a ColumnFamily,
posg_cf: ColumnFamily<'a>, posg_cf: &'a ColumnFamily,
ospg_cf: ColumnFamily<'a>, ospg_cf: &'a ColumnFamily,
gspo_cf: ColumnFamily<'a>, gspo_cf: &'a ColumnFamily,
gpos_cf: ColumnFamily<'a>, gpos_cf: &'a ColumnFamily,
gosp_cf: ColumnFamily<'a>, gosp_cf: &'a ColumnFamily,
} }
impl RocksDbRepository { impl RocksDbRepository {
@ -139,14 +132,12 @@ impl<'a> Store for &'a RocksDbStore {
} }
impl StrLookup for RocksDbStoreConnection<'_> { impl StrLookup for RocksDbStoreConnection<'_> {
type StrType = RocksString; fn get_str(&self, id: u128) -> Result<Option<String>> {
fn get_str(&self, id: u128) -> Result<Option<RocksString>> {
Ok(self Ok(self
.store .store
.db .db
.get_cf(self.id2str_cf, &id.to_le_bytes())? .get_cf(self.id2str_cf, &id.to_le_bytes())?
.map(|v| RocksString { vec: v })) .map(|v| unsafe { String::from_utf8_unchecked(v) }))
} }
} }
@ -164,7 +155,11 @@ impl<'a> StoreConnection for RocksDbStoreConnection<'a> {
fn contains(&self, quad: &EncodedQuad) -> Result<bool> { fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
buffer.write_spog_quad(quad)?; buffer.write_spog_quad(quad)?;
Ok(self.store.db.get_cf(self.spog_cf, &buffer)?.is_some()) Ok(self
.store
.db
.get_pinned_cf(self.spog_cf, &buffer)?
.is_some())
} }
fn quads_for_pattern<'b>( fn quads_for_pattern<'b>(
@ -417,7 +412,7 @@ impl<'a> RocksDbStoreConnection<'a> {
fn inner_quads( fn inner_quads(
&self, &self,
cf: ColumnFamily, cf: &ColumnFamily,
prefix: Vec<u8>, prefix: Vec<u8>,
decode: impl Fn(&[u8]) -> Result<EncodedQuad> + 'a, decode: impl Fn(&[u8]) -> Result<EncodedQuad> + 'a,
) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> { ) -> Result<impl Iterator<Item = Result<EncodedQuad>> + 'a> {
@ -532,7 +527,7 @@ impl<'a> StoreTransaction for RocksDbStoreTransaction<'a> {
} }
} }
fn get_cf<'a>(db: &'a DB, name: &str) -> Result<ColumnFamily<'a>> { fn get_cf<'a>(db: &'a DB, name: &str) -> Result<&'a ColumnFamily> {
db.cf_handle(name) db.cf_handle(name)
.ok_or_else(|| format_err!("column family {} not found", name)) .ok_or_else(|| format_err!("column family {} not found", name))
} }
@ -578,15 +573,13 @@ impl<'a, F: Fn(&[u8]) -> Result<EncodedQuad>> Iterator for DecodingIndexIterator
fn next(&mut self) -> Option<Result<EncodedQuad>> { fn next(&mut self) -> Option<Result<EncodedQuad>> {
if self.iter.valid() { if self.iter.valid() {
let result = unsafe { let result = self.iter.key().and_then(|key| {
self.iter.key_inner().and_then(|key| {
if key.starts_with(&self.prefix) { if key.starts_with(&self.prefix) {
Some((self.decode)(key)) Some((self.decode)(key))
} else { } else {
None None
} }
}) });
};
self.iter.next(); self.iter.next();
result result
} else { } else {
@ -595,30 +588,6 @@ impl<'a, F: Fn(&[u8]) -> Result<EncodedQuad>> Iterator for DecodingIndexIterator
} }
} }
pub struct RocksString {
vec: DBVector,
}
impl Deref for RocksString {
type Target = str;
fn deref(&self) -> &str {
unsafe { str::from_utf8_unchecked(&self.vec) }
}
}
impl ToString for RocksString {
fn to_string(&self) -> String {
self.deref().to_owned()
}
}
impl From<RocksString> for String {
fn from(val: RocksString) -> String {
val.deref().to_owned()
}
}
#[test] #[test]
fn repository() -> Result<()> { fn repository() -> Result<()> {
use crate::model::*; use crate::model::*;

Loading…
Cancel
Save