From 3566aff1c737516d37f591edea793368134cdd01 Mon Sep 17 00:00:00 2001 From: Tpt Date: Wed, 9 Oct 2019 13:38:16 +0200 Subject: [PATCH] Adds an internal transaction system Allows to batch changes when loading a file --- lib/src/store/memory.rs | 161 +++++++++++++++++--------------- lib/src/store/mod.rs | 52 +++++++---- lib/src/store/rocksdb.rs | 193 ++++++++++++++++++++++++--------------- 3 files changed, 241 insertions(+), 165 deletions(-) diff --git a/lib/src/store/memory.rs b/lib/src/store/memory.rs index c85e27e8..b0aaecb2 100644 --- a/lib/src/store/memory.rs +++ b/lib/src/store/memory.rs @@ -91,6 +91,12 @@ impl<'a> StrContainer for &'a MemoryStore { } impl<'a> StoreConnection for &'a MemoryStore { + type Transaction = &'a MemoryStore; + + fn transaction(&self) -> Result<&'a MemoryStore> { + Ok(self) + } + fn contains(&self, quad: &EncodedQuad) -> Result { Ok(self .indexes()? @@ -104,6 +110,85 @@ impl<'a> StoreConnection for &'a MemoryStore { })) } + fn quads_for_pattern<'b>( + &'b self, + subject: Option, + predicate: Option, + object: Option, + graph_name: Option, + ) -> Box> + 'b> { + match subject { + Some(subject) => match predicate { + Some(predicate) => match object { + Some(object) => match graph_name { + Some(graph_name) => { + let quad = EncodedQuad::new(subject, predicate, object, graph_name); + match self.contains(&quad) { + Ok(true) => Box::new(once(Ok(quad))), + Ok(false) => Box::new(empty()), + Err(error) => Box::new(once(Err(error))), + } + } + None => wrap_error( + self.quads_for_subject_predicate_object(subject, predicate, object), + ), + }, + None => match graph_name { + Some(graph_name) => wrap_error( + self.quads_for_subject_predicate_graph(subject, predicate, graph_name), + ), + None => wrap_error(self.quads_for_subject_predicate(subject, predicate)), + }, + }, + None => match object { + Some(object) => match graph_name { + Some(graph_name) => wrap_error( + self.quads_for_subject_object_graph(subject, object, graph_name), + ), + None => wrap_error(self.quads_for_subject_object(subject, object)), + }, + None => match graph_name { + Some(graph_name) => { + wrap_error(self.quads_for_subject_graph(subject, graph_name)) + } + None => wrap_error(self.quads_for_subject(subject)), + }, + }, + }, + None => match predicate { + Some(predicate) => match object { + Some(object) => match graph_name { + Some(graph_name) => wrap_error( + self.quads_for_predicate_object_graph(predicate, object, graph_name), + ), + None => wrap_error(self.quads_for_predicate_object(predicate, object)), + }, + None => match graph_name { + Some(graph_name) => { + wrap_error(self.quads_for_predicate_graph(predicate, graph_name)) + } + None => wrap_error(self.quads_for_predicate(predicate)), + }, + }, + None => match object { + Some(object) => match graph_name { + Some(graph_name) => { + wrap_error(self.quads_for_object_graph(object, graph_name)) + } + None => wrap_error(self.quads_for_object(object)), + }, + None => match graph_name { + Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)), + None => wrap_error(self.quads()), + }, + }, + }, + } + } +} + +/// TODO: implement properly +impl<'a> StoreTransaction for &'a MemoryStore { fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { let mut quad_indexes = self.indexes_mut()?; insert_into_quad_map( @@ -198,80 +283,8 @@ impl<'a> StoreConnection for &'a MemoryStore { Ok(()) } - fn quads_for_pattern<'b>( - &'b self, - subject: Option, - predicate: Option, - object: Option, - graph_name: Option, - ) -> Box> + 'b> { - match subject { - Some(subject) => match predicate { - Some(predicate) => match object { - Some(object) => match graph_name { - Some(graph_name) => { - let quad = EncodedQuad::new(subject, predicate, object, graph_name); - match self.contains(&quad) { - Ok(true) => Box::new(once(Ok(quad))), - Ok(false) => Box::new(empty()), - Err(error) => Box::new(once(Err(error))), - } - } - None => wrap_error( - self.quads_for_subject_predicate_object(subject, predicate, object), - ), - }, - None => match graph_name { - Some(graph_name) => wrap_error( - self.quads_for_subject_predicate_graph(subject, predicate, graph_name), - ), - None => wrap_error(self.quads_for_subject_predicate(subject, predicate)), - }, - }, - None => match object { - Some(object) => match graph_name { - Some(graph_name) => wrap_error( - self.quads_for_subject_object_graph(subject, object, graph_name), - ), - None => wrap_error(self.quads_for_subject_object(subject, object)), - }, - None => match graph_name { - Some(graph_name) => { - wrap_error(self.quads_for_subject_graph(subject, graph_name)) - } - None => wrap_error(self.quads_for_subject(subject)), - }, - }, - }, - None => match predicate { - Some(predicate) => match object { - Some(object) => match graph_name { - Some(graph_name) => wrap_error( - self.quads_for_predicate_object_graph(predicate, object, graph_name), - ), - None => wrap_error(self.quads_for_predicate_object(predicate, object)), - }, - None => match graph_name { - Some(graph_name) => { - wrap_error(self.quads_for_predicate_graph(predicate, graph_name)) - } - None => wrap_error(self.quads_for_predicate(predicate)), - }, - }, - None => match object { - Some(object) => match graph_name { - Some(graph_name) => { - wrap_error(self.quads_for_object_graph(object, graph_name)) - } - None => wrap_error(self.quads_for_object(object)), - }, - None => match graph_name { - Some(graph_name) => wrap_error(self.quads_for_graph(graph_name)), - None => wrap_error(self.quads()), - }, - }, - }, - } + fn commit(self) -> Result<()> { + Ok(()) } } diff --git a/lib/src/store/mod.rs b/lib/src/store/mod.rs index 84343416..7b1a0260 100644 --- a/lib/src/store/mod.rs +++ b/lib/src/store/mod.rs @@ -28,12 +28,15 @@ pub trait Store { } /// A connection to a `Store` -pub trait StoreConnection: StrContainer + StrLookup + Sized + Clone { - fn contains(&self, quad: &EncodedQuad) -> Result; +pub trait StoreConnection: StrLookup + Sized + Clone { + type Transaction: StoreTransaction; - fn insert(&mut self, quad: &EncodedQuad) -> Result<()>; + /// Creates an edition transaction + /// TODO: current transaction implementations could commit before the call to commit() + /// It's why this API is not exposed publicly yet + fn transaction(&self) -> Result; - fn remove(&mut self, quad: &EncodedQuad) -> Result<()>; + fn contains(&self, quad: &EncodedQuad) -> Result; fn quads_for_pattern<'a>( &'a self, @@ -44,6 +47,15 @@ pub trait StoreConnection: StrContainer + StrLookup + Sized + Clone { ) -> Box> + 'a>; } +/// A transaction +pub trait StoreTransaction: StrContainer + Sized { + fn insert(&mut self, quad: &EncodedQuad) -> Result<()>; + + fn remove(&mut self, quad: &EncodedQuad) -> Result<()>; + + fn commit(self) -> Result<()>; +} + /// A `RepositoryConnection` from a `StoreConnection` #[derive(Clone)] pub struct StoreRepositoryConnection { @@ -123,13 +135,17 @@ impl RepositoryConnection for StoreRepositoryConnection { } fn insert(&mut self, quad: &Quad) -> Result<()> { - let quad = self.inner.encode_quad(quad)?; - self.inner.insert(&quad) + let mut transaction = self.inner.transaction()?; + let quad = transaction.encode_quad(quad)?; + transaction.insert(&quad)?; + transaction.commit() } fn remove(&mut self, quad: &Quad) -> Result<()> { - let quad = self.inner.encode_quad(quad)?; - self.inner.remove(&quad) + let mut transaction = self.inner.transaction()?; + let quad = transaction.encode_quad(quad)?; + transaction.remove(&quad)?; + transaction.commit() } } @@ -142,30 +158,32 @@ impl StoreRepositoryConnection { where P::Error: Send + Sync + 'static, { + let mut transaction = self.inner.transaction()?; let mut bnode_map = HashMap::default(); let graph_name = if let Some(graph_name) = to_graph_name { - self.inner.encode_named_or_blank_node(graph_name)? + transaction.encode_named_or_blank_node(graph_name)? } else { EncodedTerm::DefaultGraph }; + let tr = &mut transaction; parser.parse_all(&mut move |t| { - let quad = self - .inner - .encode_rio_triple_in_graph(t, graph_name, &mut bnode_map)?; - self.inner.insert(&quad) + let quad = tr.encode_rio_triple_in_graph(t, graph_name, &mut bnode_map)?; + tr.insert(&quad) })?; - Ok(()) + transaction.commit() //TODO: partials commits } fn load_from_quad_parser(&mut self, mut parser: P) -> Result<()> where P::Error: Send + Sync + 'static, { + let mut transaction = self.inner.transaction()?; let mut bnode_map = HashMap::default(); + let tr = &mut transaction; parser.parse_all(&mut move |q| { - let quad = self.inner.encode_rio_quad(q, &mut bnode_map)?; - self.inner.insert(&quad) + let quad = tr.encode_rio_quad(q, &mut bnode_map)?; + tr.insert(&quad) })?; - Ok(()) + transaction.commit() //TODO: partials commits } } diff --git a/lib/src/store/rocksdb.rs b/lib/src/store/rocksdb.rs index 13f1695a..cb0b79de 100644 --- a/lib/src/store/rocksdb.rs +++ b/lib/src/store/rocksdb.rs @@ -1,5 +1,5 @@ use crate::store::numeric_encoder::*; -use crate::store::{Store, StoreConnection, StoreRepositoryConnection}; +use crate::store::{Store, StoreConnection, StoreRepositoryConnection, StoreTransaction}; use crate::{Repository, Result}; use failure::format_err; use rocksdb::ColumnFamily; @@ -11,6 +11,7 @@ use rocksdb::WriteBatch; use rocksdb::DB; use std::io::Cursor; use std::iter::{empty, once}; +use std::mem::swap; use std::ops::Deref; use std::path::Path; use std::str; @@ -67,6 +68,8 @@ const COLUMN_FAMILIES: [&str; 7] = [ ID2STR_CF, SPOG_CF, POSG_CF, OSPG_CF, GSPO_CF, GPOS_CF, GOSP_CF, ]; +const MAX_TRANSACTION_SIZE: usize = 1024; + struct RocksDbStore { db: DB, } @@ -74,7 +77,6 @@ struct RocksDbStore { #[derive(Clone)] pub struct RocksDbStoreConnection<'a> { store: &'a RocksDbStore, - buffer: Vec, id2str_cf: ColumnFamily<'a>, spog_cf: ColumnFamily<'a>, posg_cf: ColumnFamily<'a>, @@ -110,7 +112,11 @@ impl RocksDbStore { let new = Self { db: DB::open_cf(&options, path, &COLUMN_FAMILIES)?, }; - (&new).connection()?.set_first_strings()?; + + let mut transaction = (&new).connection()?.transaction()?; + transaction.set_first_strings()?; + transaction.commit()?; + Ok(new) } } @@ -121,7 +127,6 @@ impl<'a> Store for &'a RocksDbStore { fn connection(self) -> Result> { Ok(RocksDbStoreConnection { store: self, - buffer: Vec::default(), id2str_cf: get_cf(&self.db, ID2STR_CF)?, spog_cf: get_cf(&self.db, SPOG_CF)?, posg_cf: get_cf(&self.db, POSG_CF)?, @@ -145,84 +150,23 @@ impl StrLookup for RocksDbStoreConnection<'_> { } } -impl StrContainer for RocksDbStoreConnection<'_> { - fn insert_str(&mut self, key: u128, value: &str) -> Result<()> { - self.store - .db - .put_cf(self.id2str_cf, &key.to_le_bytes(), value)?; - Ok(()) +impl<'a> StoreConnection for RocksDbStoreConnection<'a> { + type Transaction = RocksDbStoreTransaction<'a>; + + fn transaction(&self) -> Result> { + Ok(RocksDbStoreTransaction { + connection: self.clone(), + batch: WriteBatch::default(), + buffer: Vec::default(), + }) } -} -impl<'a> StoreConnection for RocksDbStoreConnection<'a> { fn contains(&self, quad: &EncodedQuad) -> Result { let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); buffer.write_spog_quad(quad)?; Ok(self.store.db.get_cf(self.spog_cf, &buffer)?.is_some()) } - fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { - let mut batch = WriteBatch::default(); - - self.buffer.write_spog_quad(quad)?; - batch.put_cf(self.spog_cf, &self.buffer, &EMPTY_BUF)?; - self.buffer.clear(); - - self.buffer.write_posg_quad(quad)?; - batch.put_cf(self.posg_cf, &self.buffer, &EMPTY_BUF)?; - self.buffer.clear(); - - self.buffer.write_ospg_quad(quad)?; - batch.put_cf(self.ospg_cf, &self.buffer, &EMPTY_BUF)?; - self.buffer.clear(); - - self.buffer.write_gspo_quad(quad)?; - batch.put_cf(self.gspo_cf, &self.buffer, &EMPTY_BUF)?; - self.buffer.clear(); - - self.buffer.write_gpos_quad(quad)?; - batch.put_cf(self.gpos_cf, &self.buffer, &EMPTY_BUF)?; - self.buffer.clear(); - - self.buffer.write_gosp_quad(quad)?; - batch.put_cf(self.gosp_cf, &self.buffer, &EMPTY_BUF)?; - self.buffer.clear(); - - self.store.db.write(batch)?; //TODO: check what's going on if the key already exists - Ok(()) - } - - fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { - let mut batch = WriteBatch::default(); - - self.buffer.write_spog_quad(quad)?; - batch.delete_cf(self.spog_cf, &self.buffer)?; - self.buffer.clear(); - - self.buffer.write_posg_quad(quad)?; - batch.delete_cf(self.posg_cf, &self.buffer)?; - self.buffer.clear(); - - self.buffer.write_ospg_quad(quad)?; - batch.delete_cf(self.ospg_cf, &self.buffer)?; - self.buffer.clear(); - - self.buffer.write_gspo_quad(quad)?; - batch.delete_cf(self.gspo_cf, &self.buffer)?; - self.buffer.clear(); - - self.buffer.write_gpos_quad(quad)?; - batch.delete_cf(self.gpos_cf, &self.buffer)?; - self.buffer.clear(); - - self.buffer.write_gosp_quad(quad)?; - batch.delete_cf(self.gosp_cf, &self.buffer)?; - self.buffer.clear(); - - self.store.db.write(batch)?; - Ok(()) - } - fn quads_for_pattern<'b>( &'b self, subject: Option, @@ -490,6 +434,107 @@ impl<'a> RocksDbStoreConnection<'a> { } } +pub struct RocksDbStoreTransaction<'a> { + connection: RocksDbStoreConnection<'a>, + batch: WriteBatch, + buffer: Vec, +} + +impl StrContainer for RocksDbStoreTransaction<'_> { + fn insert_str(&mut self, key: u128, value: &str) -> Result<()> { + self.batch + .put_cf(self.connection.id2str_cf, &key.to_le_bytes(), value)?; + Ok(()) + } +} + +impl<'a> StoreTransaction for RocksDbStoreTransaction<'a> { + fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { + self.buffer.write_spog_quad(quad)?; + self.batch + .put_cf(self.connection.spog_cf, &self.buffer, &EMPTY_BUF)?; + self.buffer.clear(); + + self.buffer.write_posg_quad(quad)?; + self.batch + .put_cf(self.connection.posg_cf, &self.buffer, &EMPTY_BUF)?; + self.buffer.clear(); + + self.buffer.write_ospg_quad(quad)?; + self.batch + .put_cf(self.connection.ospg_cf, &self.buffer, &EMPTY_BUF)?; + self.buffer.clear(); + + self.buffer.write_gspo_quad(quad)?; + self.batch + .put_cf(self.connection.gspo_cf, &self.buffer, &EMPTY_BUF)?; + self.buffer.clear(); + + self.buffer.write_gpos_quad(quad)?; + self.batch + .put_cf(self.connection.gpos_cf, &self.buffer, &EMPTY_BUF)?; + self.buffer.clear(); + + self.buffer.write_gosp_quad(quad)?; + self.batch + .put_cf(self.connection.gosp_cf, &self.buffer, &EMPTY_BUF)?; + self.buffer.clear(); + + if self.batch.len() > MAX_TRANSACTION_SIZE { + let mut tmp_batch = WriteBatch::default(); + swap(&mut self.batch, &mut tmp_batch); + self.connection.store.db.write(tmp_batch)?; + } + + Ok(()) + } + + fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { + self.buffer.write_spog_quad(quad)?; + self.batch + .delete_cf(self.connection.spog_cf, &self.buffer)?; + self.buffer.clear(); + + self.buffer.write_posg_quad(quad)?; + self.batch + .delete_cf(self.connection.posg_cf, &self.buffer)?; + self.buffer.clear(); + + self.buffer.write_ospg_quad(quad)?; + self.batch + .delete_cf(self.connection.ospg_cf, &self.buffer)?; + self.buffer.clear(); + + self.buffer.write_gspo_quad(quad)?; + self.batch + .delete_cf(self.connection.gspo_cf, &self.buffer)?; + self.buffer.clear(); + + self.buffer.write_gpos_quad(quad)?; + self.batch + .delete_cf(self.connection.gpos_cf, &self.buffer)?; + self.buffer.clear(); + + self.buffer.write_gosp_quad(quad)?; + self.batch + .delete_cf(self.connection.gosp_cf, &self.buffer)?; + self.buffer.clear(); + + if self.batch.len() > MAX_TRANSACTION_SIZE { + let mut tmp_batch = WriteBatch::default(); + swap(&mut self.batch, &mut tmp_batch); + self.connection.store.db.write(tmp_batch)?; + } + + Ok(()) + } + + fn commit(self) -> Result<()> { + self.connection.store.db.write(self.batch)?; + Ok(()) + } +} + fn get_cf<'a>(db: &'a DB, name: &str) -> Result> { db.cf_handle(name) .ok_or_else(|| format_err!("column family {} not found", name))