use byteorder::ByteOrder; use byteorder::LittleEndian; use rocksdb::ColumnFamily; use rocksdb::DBRawIterator; use rocksdb::Options; use rocksdb::WriteBatch; use rocksdb::DB; use std::io::Cursor; use std::ops::Deref; use std::path::Path; use std::str; use std::str::FromStr; use std::sync::Mutex; use store::encoded::EncodedQuadsStore; use store::encoded::StoreDataset; use store::numeric_encoder::*; use url::Url; use utils::MutexPoisonError; use Result; /// `rudf::model::Dataset` trait implementation based on the [RocksDB](https://rocksdb.org/) key-value store /// /// To use it, the `"rocksdb"` feature need to be activated. /// /// Usage example: /// ``` /// use rudf::store::RocksDbDataset; /// let dataset = RocksDbDataset::open("example.db").unwrap(); /// ``` pub type RocksDbDataset = StoreDataset; impl RocksDbDataset { pub fn open(path: impl AsRef) -> Result { Ok(Self::new_from_store(RocksDbStore::open(path)?)) } } const ID2STR_CF: &str = "id2str"; const STR2ID_CF: &str = "id2str"; const SPOG_CF: &str = "spog"; const POSG_CF: &str = "posg"; const OSPG_CF: &str = "ospg"; const EMPTY_BUF: [u8; 0] = [0 as u8; 0]; //TODO: indexes for the default graph and indexes for the named graphs (no more Optional and space saving) const COLUMN_FAMILIES: [&str; 5] = [ID2STR_CF, STR2ID_CF, SPOG_CF, POSG_CF, OSPG_CF]; pub struct RocksDbStore { db: DB, str_id_counter: Mutex, id2str_cf: SendColumnFamily, str2id_cf: SendColumnFamily, spog_cf: SendColumnFamily, posg_cf: SendColumnFamily, ospg_cf: SendColumnFamily, } impl RocksDbStore { fn open(path: impl AsRef) -> Result { let mut options = Options::default(); options.create_if_missing(true); options.create_missing_column_families(true); let db = DB::open_cf(&options, path, &COLUMN_FAMILIES)?; let id2str_cf = SendColumnFamily(get_cf(&db, STR2ID_CF)?); let str2id_cf = SendColumnFamily(get_cf(&db, ID2STR_CF)?); let spog_cf = SendColumnFamily(get_cf(&db, SPOG_CF)?); let posg_cf = SendColumnFamily(get_cf(&db, POSG_CF)?); let ospg_cf = SendColumnFamily(get_cf(&db, OSPG_CF)?); let new = Self { db, str_id_counter: Mutex::new(RocksDBCounter::new("bsc")), id2str_cf, str2id_cf, spog_cf, posg_cf, ospg_cf, }; new.set_first_strings()?; Ok(new) } } impl StringStore for RocksDbStore { fn insert_str(&self, value: &str) -> Result { let value = value.as_bytes(); Ok(if let Some(id) = self.db.get_cf(*self.str2id_cf, value)? { LittleEndian::read_u64(&id) } else { let id = self .str_id_counter .lock() .map_err(MutexPoisonError::from)? .get_and_increment(&self.db)? as u64; let id_bytes = to_bytes(id); let mut batch = WriteBatch::default(); batch.put_cf(*self.id2str_cf, &id_bytes, value)?; batch.put_cf(*self.str2id_cf, value, &id_bytes)?; self.db.write(batch)?; id }) } fn get_str(&self, id: u64) -> Result { let value = self.db.get_cf(*self.id2str_cf, &to_bytes(id))?; if let Some(value) = value { Ok(str::from_utf8(&value)?.to_owned()) } else { Err(format_err!("value not found in the dictionary")) } } fn get_url(&self, id: u64) -> Result { let value = self.db.get_cf(*self.id2str_cf, &to_bytes(id))?; if let Some(value) = value { Ok(Url::from_str(str::from_utf8(&value)?)?) } else { Err(format_err!("value not found in the dictionary")) } } } impl EncodedQuadsStore for RocksDbStore { type QuadsIterator = SPOGIndexIterator; type QuadsForSubjectIterator = FilteringEncodedQuadsIterator; type QuadsForSubjectPredicateIterator = FilteringEncodedQuadsIterator; type QuadsForSubjectPredicateObjectIterator = FilteringEncodedQuadsIterator; type QuadsForSubjectObjectIterator = FilteringEncodedQuadsIterator; type QuadsForPredicateIterator = FilteringEncodedQuadsIterator; type QuadsForPredicateObjectIterator = FilteringEncodedQuadsIterator; type QuadsForObjectIterator = FilteringEncodedQuadsIterator; type QuadsForGraphIterator = InGraphQuadsIterator; type QuadsForSubjectGraphIterator = InGraphQuadsIterator>; type QuadsForSubjectPredicateGraphIterator = InGraphQuadsIterator>; type QuadsForSubjectObjectGraphIterator = InGraphQuadsIterator>; type QuadsForPredicateGraphIterator = InGraphQuadsIterator>; type QuadsForPredicateObjectGraphIterator = InGraphQuadsIterator>; type QuadsForObjectGraphIterator = InGraphQuadsIterator>; fn quads(&self) -> Result { let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek_to_first(); Ok(SPOGIndexIterator { iter }) } fn quads_for_subject( &self, subject: EncodedTerm, ) -> Result> { let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek(&encode_term(subject)?); Ok(FilteringEncodedQuadsIterator { iter: SPOGIndexIterator { iter }, filter: EncodedQuadPattern::new(Some(subject), None, None, None), }) } fn quads_for_subject_predicate( &self, subject: EncodedTerm, predicate: EncodedTerm, ) -> Result> { let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek(&encode_term_pair(subject, predicate)?); Ok(FilteringEncodedQuadsIterator { iter: SPOGIndexIterator { iter }, filter: EncodedQuadPattern::new(Some(subject), Some(predicate), None, None), }) } fn quads_for_subject_predicate_object( &self, subject: EncodedTerm, predicate: EncodedTerm, object: EncodedTerm, ) -> Result> { let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek(&encode_term_triple(subject, predicate, object)?); Ok(FilteringEncodedQuadsIterator { iter: SPOGIndexIterator { iter }, filter: EncodedQuadPattern::new(Some(subject), Some(predicate), Some(object), None), }) } fn quads_for_subject_object( &self, subject: EncodedTerm, object: EncodedTerm, ) -> Result> { let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek(&encode_term_pair(object, subject)?); Ok(FilteringEncodedQuadsIterator { iter: OSPGIndexIterator { iter }, filter: EncodedQuadPattern::new(Some(subject), None, Some(object), None), }) } fn quads_for_predicate( &self, predicate: EncodedTerm, ) -> Result> { let mut iter = self.db.raw_iterator_cf(*self.posg_cf)?; iter.seek(&encode_term(predicate)?); Ok(FilteringEncodedQuadsIterator { iter: POSGIndexIterator { iter }, filter: EncodedQuadPattern::new(None, Some(predicate), None, None), }) } fn quads_for_predicate_object( &self, predicate: EncodedTerm, object: EncodedTerm, ) -> Result> { let mut iter = self.db.raw_iterator_cf(*self.spog_cf)?; iter.seek(&encode_term_pair(predicate, object)?); Ok(FilteringEncodedQuadsIterator { iter: POSGIndexIterator { iter }, filter: EncodedQuadPattern::new(None, Some(predicate), Some(object), None), }) } fn quads_for_object( &self, object: EncodedTerm, ) -> Result> { let mut iter = self.db.raw_iterator_cf(*self.ospg_cf)?; iter.seek(&encode_term(object)?); Ok(FilteringEncodedQuadsIterator { iter: OSPGIndexIterator { iter }, filter: EncodedQuadPattern::new(None, None, Some(object), None), }) } fn quads_for_graph( &self, graph_name: EncodedTerm, ) -> Result> { Ok(InGraphQuadsIterator { iter: self.quads()?, graph_name, }) } fn quads_for_subject_graph( &self, subject: EncodedTerm, graph_name: EncodedTerm, ) -> Result>> { Ok(InGraphQuadsIterator { iter: self.quads_for_subject(subject)?, graph_name, }) } fn quads_for_subject_predicate_graph( &self, subject: EncodedTerm, predicate: EncodedTerm, graph_name: EncodedTerm, ) -> Result>> { Ok(InGraphQuadsIterator { iter: self.quads_for_subject_predicate(subject, predicate)?, graph_name, }) } fn quads_for_subject_object_graph( &self, subject: EncodedTerm, object: EncodedTerm, graph_name: EncodedTerm, ) -> Result>> { Ok(InGraphQuadsIterator { iter: self.quads_for_subject_object(subject, object)?, graph_name, }) } fn quads_for_predicate_graph( &self, predicate: EncodedTerm, graph_name: EncodedTerm, ) -> Result>> { Ok(InGraphQuadsIterator { iter: self.quads_for_predicate(predicate)?, graph_name, }) } fn quads_for_predicate_object_graph( &self, predicate: EncodedTerm, object: EncodedTerm, graph_name: EncodedTerm, ) -> Result>> { Ok(InGraphQuadsIterator { iter: self.quads_for_predicate_object(predicate, object)?, graph_name, }) } fn quads_for_object_graph( &self, object: EncodedTerm, graph_name: EncodedTerm, ) -> Result>> { Ok(InGraphQuadsIterator { iter: self.quads_for_object(object)?, graph_name, }) } fn contains(&self, quad: &EncodedQuad) -> Result { Ok(self .db .get_cf(*self.spog_cf, &encode_spog_quad(quad)?)? .is_some()) } fn insert(&self, quad: &EncodedQuad) -> Result<()> { let mut batch = WriteBatch::default(); batch.put_cf(*self.spog_cf, &encode_spog_quad(quad)?, &EMPTY_BUF)?; batch.put_cf(*self.posg_cf, &encode_posg_quad(quad)?, &EMPTY_BUF)?; batch.put_cf(*self.ospg_cf, &encode_ospg_quad(quad)?, &EMPTY_BUF)?; self.db.write(batch)?; //TODO: check what's going on if the key already exists Ok(()) } fn remove(&self, quad: &EncodedQuad) -> Result<()> { let mut batch = WriteBatch::default(); batch.delete_cf(*self.spog_cf, &encode_spog_quad(quad)?)?; batch.delete_cf(*self.posg_cf, &encode_posg_quad(quad)?)?; batch.delete_cf(*self.ospg_cf, &encode_ospg_quad(quad)?)?; self.db.write(batch)?; Ok(()) } } pub fn get_cf(db: &DB, name: &str) -> Result { db.cf_handle(name) .ok_or_else(|| format_err!("column family not found")) } struct RocksDBCounter { name: &'static str, } impl RocksDBCounter { fn new(name: &'static str) -> Self { Self { name } } fn get_and_increment(&self, db: &DB) -> Result { let value = db .get(self.name.as_bytes())? .map_or(0, |b| LittleEndian::read_u64(&b)); db.put(self.name.as_bytes(), &to_bytes(value + 1))?; Ok(value) } } struct EncodedQuadPattern { subject: Option, predicate: Option, object: Option, graph_name: Option, } impl EncodedQuadPattern { fn new( subject: Option, predicate: Option, object: Option, graph_name: Option, ) -> Self { Self { subject, predicate, object, graph_name, } } fn filter(&self, quad: &EncodedQuad) -> bool { if let Some(ref subject) = self.subject { if &quad.subject != subject { return false; } } if let Some(ref predicate) = self.predicate { if &quad.predicate != predicate { return false; } } if let Some(ref object) = self.object { if &quad.object != object { return false; } } if let Some(ref graph_name) = self.graph_name { if &quad.graph_name != graph_name { return false; } } true } } fn encode_term(t: EncodedTerm) -> Result> { let mut vec = Vec::default(); vec.write_term(t)?; Ok(vec) } fn encode_term_pair(t1: EncodedTerm, t2: EncodedTerm) -> Result> { let mut vec = Vec::default(); vec.write_term(t1)?; vec.write_term(t2)?; Ok(vec) } fn encode_term_triple(t1: EncodedTerm, t2: EncodedTerm, t3: EncodedTerm) -> Result> { let mut vec = Vec::default(); vec.write_term(t1)?; vec.write_term(t2)?; vec.write_term(t3)?; Ok(vec) } fn encode_spog_quad(quad: &EncodedQuad) -> Result> { let mut vec = Vec::default(); vec.write_spog_quad(quad)?; Ok(vec) } fn encode_posg_quad(quad: &EncodedQuad) -> Result> { let mut vec = Vec::default(); vec.write_posg_quad(quad)?; Ok(vec) } fn encode_ospg_quad(quad: &EncodedQuad) -> Result> { let mut vec = Vec::default(); vec.write_ospg_quad(quad)?; Ok(vec) } pub struct SPOGIndexIterator { iter: DBRawIterator, } impl Iterator for SPOGIndexIterator { type Item = Result; fn next(&mut self) -> Option> { self.iter.next(); unsafe { //This is safe because we are not keeping the buffer self.iter .key_inner() .map(|buffer| Cursor::new(buffer).read_spog_quad()) } } } pub struct POSGIndexIterator { iter: DBRawIterator, } impl Iterator for POSGIndexIterator { type Item = Result; fn next(&mut self) -> Option> { self.iter.next(); unsafe { //This is safe because we are not keeping the buffer self.iter .key_inner() .map(|buffer| Cursor::new(buffer).read_posg_quad()) } } } pub struct OSPGIndexIterator { iter: DBRawIterator, } impl Iterator for OSPGIndexIterator { type Item = Result; fn next(&mut self) -> Option> { self.iter.next(); unsafe { //This is safe because we are not keeping the buffer self.iter .key_inner() .map(|buffer| Cursor::new(buffer).read_ospg_quad()) } } } pub struct FilteringEncodedQuadsIterator>> { iter: I, filter: EncodedQuadPattern, } impl>> Iterator for FilteringEncodedQuadsIterator { type Item = Result; fn next(&mut self) -> Option> { self.iter.next().filter(|quad| match quad { Ok(quad) => self.filter.filter(quad), Err(_) => true, }) } } pub struct InGraphQuadsIterator>> { iter: I, graph_name: EncodedTerm, } impl>> Iterator for InGraphQuadsIterator { type Item = Result; fn next(&mut self) -> Option> { let graph_name = &self.graph_name; self.iter.find(|quad| match quad { Ok(quad) => graph_name == &quad.graph_name, Err(_) => true, }) } } fn to_bytes(int: u64) -> [u8; 8] { let mut buf = [0 as u8; 8]; LittleEndian::write_u64(&mut buf, int); buf } // TODO: very bad but I believe it is fine #[derive(Clone, Copy)] struct SendColumnFamily(ColumnFamily); unsafe impl Send for SendColumnFamily {} unsafe impl Sync for SendColumnFamily {} impl Deref for SendColumnFamily { type Target = ColumnFamily; fn deref(&self) -> &ColumnFamily { &self.0 } }