diff --git a/lib/src/store/rocksdb.rs b/lib/src/store/rocksdb.rs index 545465d5..9181de9c 100644 --- a/lib/src/store/rocksdb.rs +++ b/lib/src/store/rocksdb.rs @@ -1,13 +1,15 @@ //! Store based on the [RocksDB](https://rocksdb.org/) key-value database. +use crate::error::UnwrapInfallible; use crate::model::*; use crate::sparql::{GraphPattern, QueryOptions, QueryResult, SimplePreparedQuery}; use crate::store::numeric_encoder::*; use crate::store::{load_dataset, load_graph, ReadableEncodedStore, WritableEncodedStore}; use crate::{DatasetSyntax, GraphSyntax, Result}; use rocksdb::*; +use std::convert::Infallible; use std::io::{BufRead, Cursor}; -use std::mem::take; +use std::mem::{take, transmute}; use std::path::Path; use std::sync::Arc; use std::{fmt, str}; @@ -43,7 +45,7 @@ use std::{fmt, str}; /// assert_eq!(solutions.next().unwrap()?.get("s"), Some(&ex.into())); /// } /// # -/// # } +/// # }; /// # remove_dir_all("example.db")?; /// # Result::Ok(()) /// ``` @@ -68,18 +70,6 @@ const COLUMN_FAMILIES: [&str; 7] = [ const MAX_TRANSACTION_SIZE: usize = 1024; -#[derive(Clone)] -struct RocksDbStoreHandle<'a> { - db: &'a DB, - id2str_cf: &'a ColumnFamily, - spog_cf: &'a ColumnFamily, - posg_cf: &'a ColumnFamily, - ospg_cf: &'a ColumnFamily, - gspo_cf: &'a ColumnFamily, - gpos_cf: &'a ColumnFamily, - gosp_cf: &'a ColumnFamily, -} - impl RocksDbStore { /// Opens a `RocksDbStore` pub fn open(path: impl AsRef) -> Result { @@ -92,9 +82,9 @@ impl RocksDbStore { db: Arc::new(DB::open_cf(&options, path, &COLUMN_FAMILIES)?), }; - let mut transaction = new.handle().auto_transaction(); + let mut transaction = new.auto_batch_writer(); transaction.set_first_strings()?; - transaction.commit()?; + transaction.apply()?; Ok(new) } @@ -130,42 +120,39 @@ impl RocksDbStore { /// Retrieves quads with a filter on each quad component /// /// See `MemoryStore` for a usage example. - pub fn quads_for_pattern<'a>( - &'a self, + pub fn quads_for_pattern( + &self, subject: Option<&NamedOrBlankNode>, predicate: Option<&NamedNode>, object: Option<&Term>, graph_name: Option<&GraphName>, - ) -> impl Iterator> + 'a - where - Self: 'a, - { + ) -> impl Iterator> { let subject = subject.map(|s| s.into()); let predicate = predicate.map(|p| p.into()); let object = object.map(|o| o.into()); let graph_name = graph_name.map(|g| g.into()); - self.handle() - .encoded_quads_for_pattern(subject, predicate, object, graph_name) - .map(move |quad| self.decode_quad(&quad?)) + let store = self.clone(); + self.encoded_quads_for_pattern(subject, predicate, object, graph_name) + .map(move |quad| store.decode_quad(&quad?)) } /// Checks if this store contains a given quad pub fn contains(&self, quad: &Quad) -> Result { let quad = quad.into(); - self.handle().contains(&quad) + self.contains_encoded(&quad) } /// Returns the number of quads in the store pub fn len(&self) -> usize { self.db - .full_iterator_cf(self.handle().spog_cf, IteratorMode::Start) + .full_iterator_cf(self.spog_cf(), IteratorMode::Start) .count() } /// Returns if the store is empty pub fn is_empty(&self) -> bool { self.db - .full_iterator_cf(self.handle().spog_cf, IteratorMode::Start) + .full_iterator_cf(self.spog_cf(), IteratorMode::Start) .next() .is_none() } @@ -180,9 +167,15 @@ impl RocksDbStore { &'a self, f: impl FnOnce(&mut RocksDbTransaction<'a>) -> Result<()>, ) -> Result<()> { - let mut transaction = self.handle().transaction(); + let mut transaction = RocksDbTransaction { + inner: BatchWriter { + store: self, + batch: WriteBatch::default(), + buffer: Vec::default(), + }, + }; f(&mut transaction)?; - transaction.commit() + transaction.inner.apply() } /// Loads a graph file (i.e. triples) into the store @@ -198,9 +191,9 @@ impl RocksDbStore { to_graph_name: &GraphName, base_iri: Option<&str>, ) -> Result<()> { - let mut transaction = self.handle().auto_transaction(); + let mut transaction = self.auto_batch_writer(); load_graph(&mut transaction, reader, syntax, to_graph_name, base_iri)?; - transaction.commit() + transaction.apply() } /// Loads a dataset file (i.e. quads) into the store. @@ -215,102 +208,69 @@ impl RocksDbStore { syntax: DatasetSyntax, base_iri: Option<&str>, ) -> Result<()> { - let mut transaction = self.handle().auto_transaction(); + let mut transaction = self.auto_batch_writer(); load_dataset(&mut transaction, reader, syntax, base_iri)?; - transaction.commit() + transaction.apply() } /// Adds a quad to this store. pub fn insert(&self, quad: &Quad) -> Result<()> { - let mut transaction = self.handle().auto_transaction(); + let mut transaction = self.auto_batch_writer(); let quad = transaction.encode_quad(quad)?; transaction.insert_encoded(&quad)?; - transaction.commit() + transaction.apply() } /// Removes a quad from this store. pub fn remove(&self, quad: &Quad) -> Result<()> { - let mut transaction = self.handle().auto_transaction(); + let mut transaction = self.auto_batch_writer(); let quad = quad.into(); transaction.remove_encoded(&quad)?; - transaction.commit() - } - - fn handle(&self) -> RocksDbStoreHandle<'_> { - RocksDbStoreHandle { - db: &self.db, - id2str_cf: get_cf(&self.db, ID2STR_CF), - spog_cf: get_cf(&self.db, SPOG_CF), - posg_cf: get_cf(&self.db, POSG_CF), - ospg_cf: get_cf(&self.db, OSPG_CF), - gspo_cf: get_cf(&self.db, GSPO_CF), - gpos_cf: get_cf(&self.db, GPOS_CF), - gosp_cf: get_cf(&self.db, GOSP_CF), - } + transaction.apply() } -} -impl fmt::Display for RocksDbStore { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - for t in self.quads_for_pattern(None, None, None, None) { - writeln!(f, "{}", t.map_err(|_| fmt::Error)?)?; - } - Ok(()) + fn id2str_cf(&self) -> &ColumnFamily { + get_cf(&self.db, ID2STR_CF) } -} -impl StrLookup for RocksDbStore { - type Error = crate::Error; + fn spog_cf(&self) -> &ColumnFamily { + get_cf(&self.db, SPOG_CF) + } - fn get_str(&self, id: StrHash) -> Result> { - Ok(self - .db - .get_cf(get_cf(&self.db, ID2STR_CF), &id.to_be_bytes())? - .map(String::from_utf8) - .transpose()?) + fn posg_cf(&self) -> &ColumnFamily { + get_cf(&self.db, POSG_CF) } -} -impl ReadableEncodedStore for RocksDbStore { - fn encoded_quads_for_pattern<'a>( - &'a self, - subject: Option, - predicate: Option, - object: Option, - graph_name: Option, - ) -> Box> + 'a> { - Box::new( - self.handle() - .encoded_quads_for_pattern(subject, predicate, object, graph_name), - ) + fn ospg_cf(&self) -> &ColumnFamily { + get_cf(&self.db, OSPG_CF) } -} -impl<'a> RocksDbStoreHandle<'a> { - fn transaction(&self) -> RocksDbTransaction<'a> { - RocksDbTransaction { - inner: RocksDbInnerTransaction { - handle: self.clone(), - batch: WriteBatch::default(), - buffer: Vec::default(), - }, - } + fn gspo_cf(&self) -> &ColumnFamily { + get_cf(&self.db, GSPO_CF) + } + + fn gpos_cf(&self) -> &ColumnFamily { + get_cf(&self.db, GPOS_CF) + } + + fn gosp_cf(&self) -> &ColumnFamily { + get_cf(&self.db, GOSP_CF) } - fn auto_transaction(&self) -> RocksDbAutoTransaction<'a> { - RocksDbAutoTransaction { - inner: RocksDbInnerTransaction { - handle: self.clone(), + fn auto_batch_writer(&self) -> AutoBatchWriter<'_> { + AutoBatchWriter { + inner: BatchWriter { + store: self, batch: WriteBatch::default(), buffer: Vec::default(), }, } } - fn contains(&self, quad: &EncodedQuad) -> Result { + fn contains_encoded(&self, quad: &EncodedQuad) -> Result { let mut buffer = Vec::with_capacity(4 * WRITTEN_TERM_MAX_SIZE); write_spog_quad(&mut buffer, quad); - Ok(self.db.get_pinned_cf(self.spog_cf, &buffer)?.is_some()) + Ok(self.db.get_pinned_cf(self.spog_cf(), &buffer)?.is_some()) } fn encoded_quads_for_pattern( @@ -319,7 +279,7 @@ impl<'a> RocksDbStoreHandle<'a> { predicate: Option, object: Option, graph_name: Option, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { match subject { Some(subject) => match predicate { Some(predicate) => match object { @@ -376,11 +336,11 @@ impl<'a> RocksDbStoreHandle<'a> { } } - fn quads(&self) -> DecodingIndexIterator<'a> { + fn quads(&self) -> DecodingIndexIterator { self.spog_quads(Vec::default()) } - fn quads_for_subject(&self, subject: EncodedTerm) -> DecodingIndexIterator<'a> { + fn quads_for_subject(&self, subject: EncodedTerm) -> DecodingIndexIterator { self.spog_quads(encode_term(subject)) } @@ -388,7 +348,7 @@ impl<'a> RocksDbStoreHandle<'a> { &self, subject: EncodedTerm, predicate: EncodedTerm, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { self.spog_quads(encode_term_pair(subject, predicate)) } @@ -397,7 +357,7 @@ impl<'a> RocksDbStoreHandle<'a> { subject: EncodedTerm, predicate: EncodedTerm, object: EncodedTerm, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { self.spog_quads(encode_term_triple(subject, predicate, object)) } @@ -405,11 +365,11 @@ impl<'a> RocksDbStoreHandle<'a> { &self, subject: EncodedTerm, object: EncodedTerm, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { self.ospg_quads(encode_term_pair(object, subject)) } - fn quads_for_predicate(&self, predicate: EncodedTerm) -> DecodingIndexIterator<'a> { + fn quads_for_predicate(&self, predicate: EncodedTerm) -> DecodingIndexIterator { self.posg_quads(encode_term(predicate)) } @@ -417,15 +377,15 @@ impl<'a> RocksDbStoreHandle<'a> { &self, predicate: EncodedTerm, object: EncodedTerm, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { self.posg_quads(encode_term_pair(predicate, object)) } - fn quads_for_object(&self, object: EncodedTerm) -> DecodingIndexIterator<'a> { + fn quads_for_object(&self, object: EncodedTerm) -> DecodingIndexIterator { self.ospg_quads(encode_term(object)) } - fn quads_for_graph(&self, graph_name: EncodedTerm) -> DecodingIndexIterator<'a> { + fn quads_for_graph(&self, graph_name: EncodedTerm) -> DecodingIndexIterator { self.gspo_quads(encode_term(graph_name)) } @@ -433,7 +393,7 @@ impl<'a> RocksDbStoreHandle<'a> { &self, subject: EncodedTerm, graph_name: EncodedTerm, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { self.gspo_quads(encode_term_pair(graph_name, subject)) } @@ -442,7 +402,7 @@ impl<'a> RocksDbStoreHandle<'a> { subject: EncodedTerm, predicate: EncodedTerm, graph_name: EncodedTerm, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { self.gspo_quads(encode_term_triple(graph_name, subject, predicate)) } @@ -451,7 +411,7 @@ impl<'a> RocksDbStoreHandle<'a> { subject: EncodedTerm, object: EncodedTerm, graph_name: EncodedTerm, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { self.gosp_quads(encode_term_triple(graph_name, object, subject)) } @@ -459,7 +419,7 @@ impl<'a> RocksDbStoreHandle<'a> { &self, predicate: EncodedTerm, graph_name: EncodedTerm, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { self.gpos_quads(encode_term_pair(graph_name, predicate)) } @@ -468,7 +428,7 @@ impl<'a> RocksDbStoreHandle<'a> { predicate: EncodedTerm, object: EncodedTerm, graph_name: EncodedTerm, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { self.gpos_quads(encode_term_triple(graph_name, predicate, object)) } @@ -476,50 +436,84 @@ impl<'a> RocksDbStoreHandle<'a> { &self, object: EncodedTerm, graph_name: EncodedTerm, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { self.gosp_quads(encode_term_pair(graph_name, object)) } - fn spog_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { - self.inner_quads(self.spog_cf, prefix, QuadEncoding::SPOG) + fn spog_quads(&self, prefix: Vec) -> DecodingIndexIterator { + self.inner_quads(self.spog_cf(), prefix, QuadEncoding::SPOG) } - fn posg_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { - self.inner_quads(self.posg_cf, prefix, QuadEncoding::POSG) + fn posg_quads(&self, prefix: Vec) -> DecodingIndexIterator { + self.inner_quads(self.posg_cf(), prefix, QuadEncoding::POSG) } - fn ospg_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { - self.inner_quads(self.ospg_cf, prefix, QuadEncoding::OSPG) + fn ospg_quads(&self, prefix: Vec) -> DecodingIndexIterator { + self.inner_quads(self.ospg_cf(), prefix, QuadEncoding::OSPG) } - fn gspo_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { - self.inner_quads(self.gspo_cf, prefix, QuadEncoding::GSPO) + fn gspo_quads(&self, prefix: Vec) -> DecodingIndexIterator { + self.inner_quads(self.gspo_cf(), prefix, QuadEncoding::GSPO) } - fn gpos_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { - self.inner_quads(self.gpos_cf, prefix, QuadEncoding::GPOS) + fn gpos_quads(&self, prefix: Vec) -> DecodingIndexIterator { + self.inner_quads(self.gpos_cf(), prefix, QuadEncoding::GPOS) } - fn gosp_quads(&self, prefix: Vec) -> DecodingIndexIterator<'a> { - self.inner_quads(self.gosp_cf, prefix, QuadEncoding::GOSP) + fn gosp_quads(&self, prefix: Vec) -> DecodingIndexIterator { + self.inner_quads(self.gosp_cf(), prefix, QuadEncoding::GOSP) } + #[allow(unsafe_code)] fn inner_quads( &self, cf: &ColumnFamily, prefix: Vec, encoding: QuadEncoding, - ) -> DecodingIndexIterator<'a> { + ) -> DecodingIndexIterator { let mut iter = self.db.raw_iterator_cf(cf); iter.seek(&prefix); DecodingIndexIterator { - iter, + iter: unsafe { StaticDBRowIterator::new(iter, self.db.clone()) }, // This is safe because the iterator belongs to DB prefix, encoding, } } } +impl fmt::Display for RocksDbStore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for t in self.quads_for_pattern(None, None, None, None) { + writeln!(f, "{}", t.map_err(|_| fmt::Error)?)?; + } + Ok(()) + } +} + +impl StrLookup for RocksDbStore { + type Error = crate::Error; + + fn get_str(&self, id: StrHash) -> Result> { + Ok(self + .db + .get_cf(self.id2str_cf(), &id.to_be_bytes())? + .map(String::from_utf8) + .transpose()?) + } +} + +impl ReadableEncodedStore for RocksDbStore { + fn encoded_quads_for_pattern<'a>( + &'a self, + subject: Option, + predicate: Option, + object: Option, + graph_name: Option, + ) -> Box> + 'a> { + Box::new(self.encoded_quads_for_pattern(subject, predicate, object, graph_name)) + } +} + /// A prepared [SPARQL query](https://www.w3.org/TR/sparql11-query/) for the `RocksDbStore`. pub struct RocksDbPreparedQuery(SimplePreparedQuery); @@ -532,28 +526,7 @@ impl RocksDbPreparedQuery { /// Allows inserting and deleting quads during a transaction with the `RocksDbStore`. pub struct RocksDbTransaction<'a> { - inner: RocksDbInnerTransaction<'a>, -} - -impl StrContainer for RocksDbTransaction<'_> { - type Error = crate::Error; - - fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> { - self.inner.insert_str(key, value); - Ok(()) - } -} - -impl WritableEncodedStore for RocksDbTransaction<'_> { - type Error = crate::Error; - - fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> { - self.inner.insert(quad) - } - - fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> { - self.inner.remove(quad) - } + inner: BatchWriter<'a>, } impl RocksDbTransaction<'_> { @@ -571,7 +544,7 @@ impl RocksDbTransaction<'_> { to_graph_name: &GraphName, base_iri: Option<&str>, ) -> Result<()> { - load_graph(self, reader, syntax, to_graph_name, base_iri) + load_graph(&mut self.inner, reader, syntax, to_graph_name, base_iri) } /// Loads a dataset file (i.e. quads) into the store. into the store during the transaction. @@ -587,136 +560,139 @@ impl RocksDbTransaction<'_> { syntax: DatasetSyntax, base_iri: Option<&str>, ) -> Result<()> { - load_dataset(self, reader, syntax, base_iri) + load_dataset(&mut self.inner, reader, syntax, base_iri) } /// Adds a quad to this store during the transaction. - pub fn insert(&mut self, quad: &Quad) -> Result<()> { - let quad = self.encode_quad(quad)?; - self.insert_encoded(&quad) + pub fn insert(&mut self, quad: &Quad) { + let quad = self.inner.encode_quad(quad).unwrap_infallible(); + self.inner.insert_encoded(&quad).unwrap_infallible() } /// Removes a quad from this store during the transaction. - pub fn remove(&mut self, quad: &Quad) -> Result<()> { + pub fn remove(&mut self, quad: &Quad) { let quad = quad.into(); - self.remove_encoded(&quad) - } - - fn commit(self) -> Result<()> { - self.inner.commit() + self.inner.remove_encoded(&quad).unwrap_infallible() } } -struct RocksDbAutoTransaction<'a> { - inner: RocksDbInnerTransaction<'a>, +struct BatchWriter<'a> { + store: &'a RocksDbStore, + batch: WriteBatch, + buffer: Vec, } -impl StrContainer for RocksDbAutoTransaction<'_> { - type Error = crate::Error; +impl StrContainer for BatchWriter<'_> { + type Error = Infallible; - fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> { - self.inner.insert_str(key, value); + fn insert_str(&mut self, key: StrHash, value: &str) -> std::result::Result<(), Infallible> { + self.batch + .put_cf(self.store.id2str_cf(), &key.to_be_bytes(), value); Ok(()) } } -impl WritableEncodedStore for RocksDbAutoTransaction<'_> { - type Error = crate::Error; - - fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> { - self.inner.insert(quad)?; - self.commit_if_big() - } - - fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> { - self.inner.remove(quad)?; - self.commit_if_big() - } -} +impl WritableEncodedStore for BatchWriter<'_> { + type Error = Infallible; -impl RocksDbAutoTransaction<'_> { - fn commit(self) -> Result<()> { - self.inner.commit() - } - - fn commit_if_big(&mut self) -> Result<()> { - if self.inner.batch.len() > MAX_TRANSACTION_SIZE { - self.inner.handle.db.write(take(&mut self.inner.batch))?; - } - Ok(()) - } -} - -struct RocksDbInnerTransaction<'a> { - handle: RocksDbStoreHandle<'a>, - batch: WriteBatch, - buffer: Vec, -} - -impl RocksDbInnerTransaction<'_> { - fn insert_str(&mut self, key: StrHash, value: &str) { - self.batch - .put_cf(self.handle.id2str_cf, &key.to_be_bytes(), value) - } - - fn insert(&mut self, quad: &EncodedQuad) -> Result<()> { + fn insert_encoded(&mut self, quad: &EncodedQuad) -> std::result::Result<(), Infallible> { write_spog_quad(&mut self.buffer, quad); - self.batch.put_cf(self.handle.spog_cf, &self.buffer, &[]); + self.batch.put_cf(self.store.spog_cf(), &self.buffer, &[]); self.buffer.clear(); write_posg_quad(&mut self.buffer, quad); - self.batch.put_cf(self.handle.posg_cf, &self.buffer, &[]); + self.batch.put_cf(self.store.posg_cf(), &self.buffer, &[]); self.buffer.clear(); write_ospg_quad(&mut self.buffer, quad); - self.batch.put_cf(self.handle.ospg_cf, &self.buffer, &[]); + self.batch.put_cf(self.store.ospg_cf(), &self.buffer, &[]); self.buffer.clear(); write_gspo_quad(&mut self.buffer, quad); - self.batch.put_cf(self.handle.gspo_cf, &self.buffer, &[]); + self.batch.put_cf(self.store.gspo_cf(), &self.buffer, &[]); self.buffer.clear(); write_gpos_quad(&mut self.buffer, quad); - self.batch.put_cf(self.handle.gpos_cf, &self.buffer, &[]); + self.batch.put_cf(self.store.gpos_cf(), &self.buffer, &[]); self.buffer.clear(); write_gosp_quad(&mut self.buffer, quad); - self.batch.put_cf(self.handle.gosp_cf, &self.buffer, &[]); + self.batch.put_cf(self.store.gosp_cf(), &self.buffer, &[]); self.buffer.clear(); Ok(()) } - fn remove(&mut self, quad: &EncodedQuad) -> Result<()> { + fn remove_encoded(&mut self, quad: &EncodedQuad) -> std::result::Result<(), Infallible> { write_spog_quad(&mut self.buffer, quad); - self.batch.delete_cf(self.handle.spog_cf, &self.buffer); + self.batch.delete_cf(self.store.spog_cf(), &self.buffer); self.buffer.clear(); write_posg_quad(&mut self.buffer, quad); - self.batch.delete_cf(self.handle.posg_cf, &self.buffer); + self.batch.delete_cf(self.store.posg_cf(), &self.buffer); self.buffer.clear(); write_ospg_quad(&mut self.buffer, quad); - self.batch.delete_cf(self.handle.ospg_cf, &self.buffer); + self.batch.delete_cf(self.store.ospg_cf(), &self.buffer); self.buffer.clear(); write_gspo_quad(&mut self.buffer, quad); - self.batch.delete_cf(self.handle.gspo_cf, &self.buffer); + self.batch.delete_cf(self.store.gspo_cf(), &self.buffer); self.buffer.clear(); write_gpos_quad(&mut self.buffer, quad); - self.batch.delete_cf(self.handle.gpos_cf, &self.buffer); + self.batch.delete_cf(self.store.gpos_cf(), &self.buffer); self.buffer.clear(); write_gosp_quad(&mut self.buffer, quad); - self.batch.delete_cf(self.handle.gosp_cf, &self.buffer); + self.batch.delete_cf(self.store.gosp_cf(), &self.buffer); self.buffer.clear(); Ok(()) } +} + +impl BatchWriter<'_> { + fn apply(self) -> Result<()> { + Ok(self.store.db.write(self.batch)?) + } +} + +struct AutoBatchWriter<'a> { + inner: BatchWriter<'a>, +} + +impl StrContainer for AutoBatchWriter<'_> { + type Error = crate::Error; + + fn insert_str(&mut self, key: StrHash, value: &str) -> Result<()> { + Ok(self.inner.insert_str(key, value)?) + } +} + +impl WritableEncodedStore for AutoBatchWriter<'_> { + type Error = crate::Error; + + fn insert_encoded(&mut self, quad: &EncodedQuad) -> Result<()> { + self.inner.insert_encoded(quad)?; + self.apply_if_big() + } + + fn remove_encoded(&mut self, quad: &EncodedQuad) -> Result<()> { + self.inner.remove_encoded(quad)?; + self.apply_if_big() + } +} + +impl AutoBatchWriter<'_> { + fn apply(self) -> Result<()> { + self.inner.apply() + } - fn commit(self) -> Result<()> { - self.handle.db.write(self.batch)?; + fn apply_if_big(&mut self) -> Result<()> { + if self.inner.batch.len() > MAX_TRANSACTION_SIZE { + self.inner.store.db.write(take(&mut self.inner.batch))?; + } Ok(()) } } @@ -757,20 +733,40 @@ fn encode_term_quad(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm, t4: Encod vec } -struct DecodingIndexIterator<'a> { - iter: DBRawIterator<'a>, +struct StaticDBRowIterator { + iter: DBRawIterator<'static>, + _db: Arc, // needed to ensure that DB still lives while iter is used +} + +impl StaticDBRowIterator { + /// Creates a static iterator from a non static one by keeping a ARC reference to the database + /// Caller must unsure that the iterator belongs to the same database + /// + /// This unsafe method is required to get static iterators and ease the usage of the library + /// and make streaming Python bindings possible + #[allow(unsafe_code)] + unsafe fn new(iter: DBRawIterator<'_>, db: Arc) -> Self { + Self { + iter: transmute(iter), + _db: db, + } + } +} + +struct DecodingIndexIterator { + iter: StaticDBRowIterator, prefix: Vec, encoding: QuadEncoding, } -impl<'a> Iterator for DecodingIndexIterator<'a> { +impl Iterator for DecodingIndexIterator { type Item = Result; fn next(&mut self) -> Option> { - if let Some(key) = self.iter.key() { + if let Some(key) = self.iter.iter.key() { if key.starts_with(&self.prefix) { let result = self.encoding.decode(key).map_err(crate::Error::from); - self.iter.next(); + self.iter.iter.next(); Some(result) } else { None diff --git a/wikibase/src/loader.rs b/wikibase/src/loader.rs index 20e56def..a1f211a1 100644 --- a/wikibase/src/loader.rs +++ b/wikibase/src/loader.rs @@ -266,7 +266,7 @@ impl WikibaseLoader { .quads_for_pattern(None, None, None, Some(&graph_name)) .collect::>>()?; for q in to_remove { - transaction.remove(&q)?; + transaction.remove(&q); } transaction.load_graph(