Adds an internal transaction system

Allows to batch changes when loading a file
pull/10/head
Tpt 5 years ago
parent 88a97aa904
commit 3566aff1c7
  1. 161
      lib/src/store/memory.rs
  2. 52
      lib/src/store/mod.rs
  3. 193
      lib/src/store/rocksdb.rs

@ -91,6 +91,12 @@ impl<'a> StrContainer for &'a MemoryStore {
} }
impl<'a> StoreConnection for &'a MemoryStore { impl<'a> StoreConnection for &'a MemoryStore {
type Transaction = &'a MemoryStore;
fn transaction(&self) -> Result<&'a MemoryStore> {
Ok(self)
}
fn contains(&self, quad: &EncodedQuad) -> Result<bool> { fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
Ok(self Ok(self
.indexes()? .indexes()?
@ -104,6 +110,85 @@ impl<'a> StoreConnection for &'a MemoryStore {
})) }))
} }
fn quads_for_pattern<'b>(
&'b self,
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'b> {
match subject {
Some(subject) => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => {
let quad = EncodedQuad::new(subject, predicate, object, graph_name);
match self.contains(&quad) {
Ok(true) => Box::new(once(Ok(quad))),
Ok(false) => Box::new(empty()),
Err(error) => Box::new(once(Err(error))),
}
}
None => wrap_error(
self.quads_for_subject_predicate_object(subject, predicate, object),
),
},
None => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_predicate_graph(subject, predicate, graph_name),
),
None => wrap_error(self.quads_for_subject_predicate(subject, predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_object_graph(subject, object, graph_name),
),
None => wrap_error(self.quads_for_subject_object(subject, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_subject_graph(subject, graph_name))
}
None => wrap_error(self.quads_for_subject(subject)),
},
},
},
None => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_predicate_object_graph(predicate, object, graph_name),
),
None => wrap_error(self.quads_for_predicate_object(predicate, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_predicate_graph(predicate, graph_name))
}
None => wrap_error(self.quads_for_predicate(predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_object_graph(object, graph_name))
}
None => wrap_error(self.quads_for_object(object)),
},
None => match graph_name {
Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)),
None => wrap_error(self.quads()),
},
},
},
}
}
}
/// TODO: implement properly
impl<'a> StoreTransaction for &'a MemoryStore {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
let mut quad_indexes = self.indexes_mut()?; let mut quad_indexes = self.indexes_mut()?;
insert_into_quad_map( insert_into_quad_map(
@ -198,80 +283,8 @@ impl<'a> StoreConnection for &'a MemoryStore {
Ok(()) Ok(())
} }
fn quads_for_pattern<'b>( fn commit(self) -> Result<()> {
&'b self, Ok(())
subject: Option<EncodedTerm>,
predicate: Option<EncodedTerm>,
object: Option<EncodedTerm>,
graph_name: Option<EncodedTerm>,
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'b> {
match subject {
Some(subject) => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => {
let quad = EncodedQuad::new(subject, predicate, object, graph_name);
match self.contains(&quad) {
Ok(true) => Box::new(once(Ok(quad))),
Ok(false) => Box::new(empty()),
Err(error) => Box::new(once(Err(error))),
}
}
None => wrap_error(
self.quads_for_subject_predicate_object(subject, predicate, object),
),
},
None => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_predicate_graph(subject, predicate, graph_name),
),
None => wrap_error(self.quads_for_subject_predicate(subject, predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_subject_object_graph(subject, object, graph_name),
),
None => wrap_error(self.quads_for_subject_object(subject, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_subject_graph(subject, graph_name))
}
None => wrap_error(self.quads_for_subject(subject)),
},
},
},
None => match predicate {
Some(predicate) => match object {
Some(object) => match graph_name {
Some(graph_name) => wrap_error(
self.quads_for_predicate_object_graph(predicate, object, graph_name),
),
None => wrap_error(self.quads_for_predicate_object(predicate, object)),
},
None => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_predicate_graph(predicate, graph_name))
}
None => wrap_error(self.quads_for_predicate(predicate)),
},
},
None => match object {
Some(object) => match graph_name {
Some(graph_name) => {
wrap_error(self.quads_for_object_graph(object, graph_name))
}
None => wrap_error(self.quads_for_object(object)),
},
None => match graph_name {
Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)),
None => wrap_error(self.quads()),
},
},
},
}
} }
} }

@ -28,12 +28,15 @@ pub trait Store {
} }
/// A connection to a `Store` /// A connection to a `Store`
pub trait StoreConnection: StrContainer + StrLookup + Sized + Clone { pub trait StoreConnection: StrLookup + Sized + Clone {
fn contains(&self, quad: &EncodedQuad) -> Result<bool>; type Transaction: StoreTransaction;
fn insert(&mut self, quad: &EncodedQuad) -> Result<()>; /// Creates an edition transaction
/// TODO: current transaction implementations could commit before the call to commit()
/// It's why this API is not exposed publicly yet
fn transaction(&self) -> Result<Self::Transaction>;
fn remove(&mut self, quad: &EncodedQuad) -> Result<()>; fn contains(&self, quad: &EncodedQuad) -> Result<bool>;
fn quads_for_pattern<'a>( fn quads_for_pattern<'a>(
&'a self, &'a self,
@ -44,6 +47,15 @@ pub trait StoreConnection: StrContainer + StrLookup + Sized + Clone {
) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a>; ) -> Box<dyn Iterator<Item = Result<EncodedQuad>> + 'a>;
} }
/// A transaction
pub trait StoreTransaction: StrContainer + Sized {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()>;
fn remove(&mut self, quad: &EncodedQuad) -> Result<()>;
fn commit(self) -> Result<()>;
}
/// A `RepositoryConnection` from a `StoreConnection` /// A `RepositoryConnection` from a `StoreConnection`
#[derive(Clone)] #[derive(Clone)]
pub struct StoreRepositoryConnection<S: StoreConnection> { pub struct StoreRepositoryConnection<S: StoreConnection> {
@ -123,13 +135,17 @@ impl<S: StoreConnection> RepositoryConnection for StoreRepositoryConnection<S> {
} }
fn insert(&mut self, quad: &Quad) -> Result<()> { fn insert(&mut self, quad: &Quad) -> Result<()> {
let quad = self.inner.encode_quad(quad)?; let mut transaction = self.inner.transaction()?;
self.inner.insert(&quad) let quad = transaction.encode_quad(quad)?;
transaction.insert(&quad)?;
transaction.commit()
} }
fn remove(&mut self, quad: &Quad) -> Result<()> { fn remove(&mut self, quad: &Quad) -> Result<()> {
let quad = self.inner.encode_quad(quad)?; let mut transaction = self.inner.transaction()?;
self.inner.remove(&quad) let quad = transaction.encode_quad(quad)?;
transaction.remove(&quad)?;
transaction.commit()
} }
} }
@ -142,30 +158,32 @@ impl<S: StoreConnection> StoreRepositoryConnection<S> {
where where
P::Error: Send + Sync + 'static, P::Error: Send + Sync + 'static,
{ {
let mut transaction = self.inner.transaction()?;
let mut bnode_map = HashMap::default(); let mut bnode_map = HashMap::default();
let graph_name = if let Some(graph_name) = to_graph_name { let graph_name = if let Some(graph_name) = to_graph_name {
self.inner.encode_named_or_blank_node(graph_name)? transaction.encode_named_or_blank_node(graph_name)?
} else { } else {
EncodedTerm::DefaultGraph EncodedTerm::DefaultGraph
}; };
let tr = &mut transaction;
parser.parse_all(&mut move |t| { parser.parse_all(&mut move |t| {
let quad = self let quad = tr.encode_rio_triple_in_graph(t, graph_name, &mut bnode_map)?;
.inner tr.insert(&quad)
.encode_rio_triple_in_graph(t, graph_name, &mut bnode_map)?;
self.inner.insert(&quad)
})?; })?;
Ok(()) transaction.commit() //TODO: partials commits
} }
fn load_from_quad_parser<P: QuadsParser>(&mut self, mut parser: P) -> Result<()> fn load_from_quad_parser<P: QuadsParser>(&mut self, mut parser: P) -> Result<()>
where where
P::Error: Send + Sync + 'static, P::Error: Send + Sync + 'static,
{ {
let mut transaction = self.inner.transaction()?;
let mut bnode_map = HashMap::default(); let mut bnode_map = HashMap::default();
let tr = &mut transaction;
parser.parse_all(&mut move |q| { parser.parse_all(&mut move |q| {
let quad = self.inner.encode_rio_quad(q, &mut bnode_map)?; let quad = tr.encode_rio_quad(q, &mut bnode_map)?;
self.inner.insert(&quad) tr.insert(&quad)
})?; })?;
Ok(()) transaction.commit() //TODO: partials commits
} }
} }

@ -1,5 +1,5 @@
use crate::store::numeric_encoder::*; use crate::store::numeric_encoder::*;
use crate::store::{Store, StoreConnection, StoreRepositoryConnection}; use crate::store::{Store, StoreConnection, StoreRepositoryConnection, StoreTransaction};
use crate::{Repository, Result}; use crate::{Repository, Result};
use failure::format_err; use failure::format_err;
use rocksdb::ColumnFamily; use rocksdb::ColumnFamily;
@ -11,6 +11,7 @@ use rocksdb::WriteBatch;
use rocksdb::DB; use rocksdb::DB;
use std::io::Cursor; use std::io::Cursor;
use std::iter::{empty, once}; use std::iter::{empty, once};
use std::mem::swap;
use std::ops::Deref; use std::ops::Deref;
use std::path::Path; use std::path::Path;
use std::str; use std::str;
@ -67,6 +68,8 @@ const COLUMN_FAMILIES: [&str; 7] = [
ID2STR_CF, SPOG_CF, POSG_CF, OSPG_CF, GSPO_CF, GPOS_CF, GOSP_CF, ID2STR_CF, SPOG_CF, POSG_CF, OSPG_CF, GSPO_CF, GPOS_CF, GOSP_CF,
]; ];
const MAX_TRANSACTION_SIZE: usize = 1024;
struct RocksDbStore { struct RocksDbStore {
db: DB, db: DB,
} }
@ -74,7 +77,6 @@ struct RocksDbStore {
#[derive(Clone)] #[derive(Clone)]
pub struct RocksDbStoreConnection<'a> { pub struct RocksDbStoreConnection<'a> {
store: &'a RocksDbStore, store: &'a RocksDbStore,
buffer: Vec<u8>,
id2str_cf: ColumnFamily<'a>, id2str_cf: ColumnFamily<'a>,
spog_cf: ColumnFamily<'a>, spog_cf: ColumnFamily<'a>,
posg_cf: ColumnFamily<'a>, posg_cf: ColumnFamily<'a>,
@ -110,7 +112,11 @@ impl RocksDbStore {
let new = Self { let new = Self {
db: DB::open_cf(&options, path, &COLUMN_FAMILIES)?, db: DB::open_cf(&options, path, &COLUMN_FAMILIES)?,
}; };
(&new).connection()?.set_first_strings()?;
let mut transaction = (&new).connection()?.transaction()?;
transaction.set_first_strings()?;
transaction.commit()?;
Ok(new) Ok(new)
} }
} }
@ -121,7 +127,6 @@ impl<'a> Store for &'a RocksDbStore {
fn connection(self) -> Result<RocksDbStoreConnection<'a>> { fn connection(self) -> Result<RocksDbStoreConnection<'a>> {
Ok(RocksDbStoreConnection { Ok(RocksDbStoreConnection {
store: self, store: self,
buffer: Vec::default(),
id2str_cf: get_cf(&self.db, ID2STR_CF)?, id2str_cf: get_cf(&self.db, ID2STR_CF)?,
spog_cf: get_cf(&self.db, SPOG_CF)?, spog_cf: get_cf(&self.db, SPOG_CF)?,
posg_cf: get_cf(&self.db, POSG_CF)?, posg_cf: get_cf(&self.db, POSG_CF)?,
@ -145,84 +150,23 @@ impl StrLookup for RocksDbStoreConnection<'_> {
} }
} }
impl StrContainer for RocksDbStoreConnection<'_> { impl<'a> StoreConnection for RocksDbStoreConnection<'a> {
fn insert_str(&mut self, key: u128, value: &str) -> Result<()> { type Transaction = RocksDbStoreTransaction<'a>;
self.store
.db fn transaction(&self) -> Result<RocksDbStoreTransaction<'a>> {
.put_cf(self.id2str_cf, &key.to_le_bytes(), value)?; Ok(RocksDbStoreTransaction {
Ok(()) connection: self.clone(),
} batch: WriteBatch::default(),
buffer: Vec::default(),
})
} }
impl<'a> StoreConnection for RocksDbStoreConnection<'a> {
fn contains(&self, quad: &EncodedQuad) -> Result<bool> { fn contains(&self, quad: &EncodedQuad) -> Result<bool> {
let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE);
buffer.write_spog_quad(quad)?; buffer.write_spog_quad(quad)?;
Ok(self.store.db.get_cf(self.spog_cf, &buffer)?.is_some()) Ok(self.store.db.get_cf(self.spog_cf, &buffer)?.is_some())
} }
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
self.buffer.write_spog_quad(quad)?;
batch.put_cf(self.spog_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_posg_quad(quad)?;
batch.put_cf(self.posg_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_ospg_quad(quad)?;
batch.put_cf(self.ospg_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_gspo_quad(quad)?;
batch.put_cf(self.gspo_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_gpos_quad(quad)?;
batch.put_cf(self.gpos_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_gosp_quad(quad)?;
batch.put_cf(self.gosp_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.store.db.write(batch)?; //TODO: check what's going on if the key already exists
Ok(())
}
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> {
let mut batch = WriteBatch::default();
self.buffer.write_spog_quad(quad)?;
batch.delete_cf(self.spog_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_posg_quad(quad)?;
batch.delete_cf(self.posg_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_ospg_quad(quad)?;
batch.delete_cf(self.ospg_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_gspo_quad(quad)?;
batch.delete_cf(self.gspo_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_gpos_quad(quad)?;
batch.delete_cf(self.gpos_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_gosp_quad(quad)?;
batch.delete_cf(self.gosp_cf, &self.buffer)?;
self.buffer.clear();
self.store.db.write(batch)?;
Ok(())
}
fn quads_for_pattern<'b>( fn quads_for_pattern<'b>(
&'b self, &'b self,
subject: Option<EncodedTerm>, subject: Option<EncodedTerm>,
@ -490,6 +434,107 @@ impl<'a> RocksDbStoreConnection<'a> {
} }
} }
pub struct RocksDbStoreTransaction<'a> {
connection: RocksDbStoreConnection<'a>,
batch: WriteBatch,
buffer: Vec<u8>,
}
impl StrContainer for RocksDbStoreTransaction<'_> {
fn insert_str(&mut self, key: u128, value: &str) -> Result<()> {
self.batch
.put_cf(self.connection.id2str_cf, &key.to_le_bytes(), value)?;
Ok(())
}
}
impl<'a> StoreTransaction for RocksDbStoreTransaction<'a> {
fn insert(&mut self, quad: &EncodedQuad) -> Result<()> {
self.buffer.write_spog_quad(quad)?;
self.batch
.put_cf(self.connection.spog_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_posg_quad(quad)?;
self.batch
.put_cf(self.connection.posg_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_ospg_quad(quad)?;
self.batch
.put_cf(self.connection.ospg_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_gspo_quad(quad)?;
self.batch
.put_cf(self.connection.gspo_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_gpos_quad(quad)?;
self.batch
.put_cf(self.connection.gpos_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
self.buffer.write_gosp_quad(quad)?;
self.batch
.put_cf(self.connection.gosp_cf, &self.buffer, &EMPTY_BUF)?;
self.buffer.clear();
if self.batch.len() > MAX_TRANSACTION_SIZE {
let mut tmp_batch = WriteBatch::default();
swap(&mut self.batch, &mut tmp_batch);
self.connection.store.db.write(tmp_batch)?;
}
Ok(())
}
fn remove(&mut self, quad: &EncodedQuad) -> Result<()> {
self.buffer.write_spog_quad(quad)?;
self.batch
.delete_cf(self.connection.spog_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_posg_quad(quad)?;
self.batch
.delete_cf(self.connection.posg_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_ospg_quad(quad)?;
self.batch
.delete_cf(self.connection.ospg_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_gspo_quad(quad)?;
self.batch
.delete_cf(self.connection.gspo_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_gpos_quad(quad)?;
self.batch
.delete_cf(self.connection.gpos_cf, &self.buffer)?;
self.buffer.clear();
self.buffer.write_gosp_quad(quad)?;
self.batch
.delete_cf(self.connection.gosp_cf, &self.buffer)?;
self.buffer.clear();
if self.batch.len() > MAX_TRANSACTION_SIZE {
let mut tmp_batch = WriteBatch::default();
swap(&mut self.batch, &mut tmp_batch);
self.connection.store.db.write(tmp_batch)?;
}
Ok(())
}
fn commit(self) -> Result<()> {
self.connection.store.db.write(self.batch)?;
Ok(())
}
}
fn get_cf<'a>(db: &'a DB, name: &str) -> Result<ColumnFamily<'a>> { fn get_cf<'a>(db: &'a DB, name: &str) -> Result<ColumnFamily<'a>> {
db.cf_handle(name) db.cf_handle(name)
.ok_or_else(|| format_err!("column family {} not found", name)) .ok_or_else(|| format_err!("column family {} not found", name))

Loading…
Cancel
Save