diff --git a/p2p-broker/src/broker_store_overlay.rs b/p2p-broker/src/broker_store_overlay.rs index 570ea11..34f287f 100644 --- a/p2p-broker/src/broker_store_overlay.rs +++ b/p2p-broker/src/broker_store_overlay.rs @@ -73,32 +73,33 @@ impl<'a> Overlay<'a> { if acc.exists() { return Err(StorageError::BackendError); } - store.put( - Self::PREFIX, - &to_vec(&id)?, - Some(Self::SECRET), - to_vec(&secret)?, - )?; - if repo.is_some() { - store.put( + store.write_transaction(&|tx| { + tx.put( Self::PREFIX, &to_vec(&id)?, - Some(Self::REPO), - to_vec(&repo.unwrap())?, + Some(Self::SECRET), + &to_vec(&secret)?, )?; - //TODO if failure, should remove the previously added SECRET property - } - let meta = OverlayMeta { - users: 1, - last_used: now_timestamp(), - }; - store.put( - Self::PREFIX, - &to_vec(&id)?, - Some(Self::META), - to_vec(&meta)?, - )?; - //TODO if failure, should remove the previously added SECRET and REPO properties + if repo.is_some() { + tx.put( + Self::PREFIX, + &to_vec(&id)?, + Some(Self::REPO), + & to_vec(&repo.unwrap())?, + )?; + } + let meta = OverlayMeta { + users: 1, + last_used: now_timestamp(), + }; + tx.put( + Self::PREFIX, + &to_vec(&id)?, + Some(Self::META), + &to_vec(&meta)?, + )?; + Ok(()) + })?; Ok(acc) } pub fn exists(&self) -> bool { diff --git a/p2p-broker/src/broker_store_peer.rs b/p2p-broker/src/broker_store_peer.rs index d6bac7a..7b28a21 100644 --- a/p2p-broker/src/broker_store_peer.rs +++ b/p2p-broker/src/broker_store_peer.rs @@ -74,18 +74,21 @@ impl<'a> Peer<'a> { if acc.exists() { return Err(StorageError::BackendError); } - store.put( - Self::PREFIX, - &to_vec(&id)?, - Some(Self::VERSION), - to_vec(&advert.version())?, - )?; - store.put( - Self::PREFIX, - &to_vec(&id)?, - Some(Self::ADVERT), - to_vec(&advert)?, - )?; + store.write_transaction(&|tx| { + tx.put( + Self::PREFIX, + &to_vec(&id)?, + Some(Self::VERSION), + & to_vec(&advert.version())?, + )?; + tx.put( + Self::PREFIX, + &to_vec(&id)?, + Some(Self::ADVERT), + & to_vec(&advert)?, + )?; + Ok(()) + })?; Ok(acc) } pub fn exists(&self) -> bool { diff --git a/p2p-broker/src/broker_store_topic.rs b/p2p-broker/src/broker_store_topic.rs index 6e95519..ba17fca 100644 --- a/p2p-broker/src/broker_store_topic.rs +++ b/p2p-broker/src/broker_store_topic.rs @@ -50,7 +50,7 @@ impl<'a> Topic<'a> { } Ok(opening) } - pub fn create(id: &TopicId, store: &'a dyn BrokerStore) -> Result, StorageError> { + pub fn create(id: &TopicId, store: &'a mut dyn BrokerStore) -> Result, StorageError> { let acc = Topic { id: id.clone(), store, diff --git a/p2p-repo/src/broker_store.rs b/p2p-repo/src/broker_store.rs index 7be106a..68d31dc 100644 --- a/p2p-repo/src/broker_store.rs +++ b/p2p-repo/src/broker_store.rs @@ -8,7 +8,45 @@ use crate::store::{StorageError}; -pub trait BrokerStore { +pub trait WriteTransaction : ReadTransaction { + + /// Save a property value to the store. + fn put( + &mut self, + prefix: u8, + key: &Vec, + suffix: Option, + value: &Vec, + ) -> Result<(), StorageError>; + + /// Replace the property of a key (single value) to the store. + fn replace( + &mut self, + prefix: u8, + key: &Vec, + suffix: Option, + value: &Vec, + ) -> Result<(), StorageError>; + + /// Delete a property from the store. + fn del(&mut self, prefix: u8, key: &Vec, suffix: Option) -> Result<(), StorageError>; + + /// Delete all properties of a key from the store. + fn del_all(&mut self, prefix: u8, key: &Vec, all_suffixes: &[u8]) -> Result<(), StorageError>; + + /// Delete a specific value for a property from the store. + fn del_property_value( + &mut self, + prefix: u8, + key: &Vec, + suffix: Option, + value: &Vec, + ) -> Result<(), StorageError>; + +} + +pub trait ReadTransaction { + /// Load a property from the store. fn get(&self, prefix: u8, key: &Vec, suffix: Option) -> Result, StorageError>; @@ -29,6 +67,12 @@ pub trait BrokerStore { value: Vec, ) -> Result<(), StorageError>; +} + +pub trait BrokerStore : ReadTransaction { + + fn write_transaction(&self, method: & dyn Fn(&mut dyn WriteTransaction) -> Result<(), StorageError> ) -> Result<(), StorageError> ; + /// Save a property value to the store. fn put( &self, @@ -61,4 +105,6 @@ pub trait BrokerStore { suffix: Option, value: Vec, ) -> Result<(), StorageError>; + + } diff --git a/p2p-stores-lmdb/src/broker_store.rs b/p2p-stores-lmdb/src/broker_store.rs index 0b2d634..01b23d4 100644 --- a/p2p-stores-lmdb/src/broker_store.rs +++ b/p2p-stores-lmdb/src/broker_store.rs @@ -15,6 +15,7 @@ use p2p_repo::utils::*; use debug_print::*; use std::path::Path; use std::path::PathBuf; +use std::sync::RwLockReadGuard; use std::sync::{Arc, RwLock}; use rkv::backend::{ @@ -28,6 +29,165 @@ use rkv::{ use serde::{Deserialize, Serialize}; use serde_bare::error::Error; + +pub struct LmdbTransaction<'a> { + + store: &'a LmdbBrokerStore, + writer: Option>>, + +} + +impl<'a> LmdbTransaction<'a> { + + fn commit(&mut self) { + self.writer.take().unwrap().commit().unwrap(); + } + +} + +impl<'a> ReadTransaction for LmdbTransaction<'a> { + /// Load a single value property from the store. + fn get(&self, prefix: u8, key: &Vec, suffix: Option) -> Result, StorageError> { + let property = LmdbBrokerStore::compute_property(prefix, key, suffix); + + let mut iter = self + .store.main_store + .get(self.writer.as_ref().unwrap(), property) + .map_err(|e| StorageError::BackendError)?; + match iter.next() { + Some(Ok(val)) => Ok(val.1.to_bytes().unwrap()), + Some(Err(_e)) => Err(StorageError::BackendError), + None => Err(StorageError::NotFound), + } + } + + /// Load all the values of a property from the store. + fn get_all( + &self, + prefix: u8, + key: &Vec, + suffix: Option, + ) -> Result>, StorageError> { + let property = LmdbBrokerStore::compute_property(prefix, key, suffix); + + let mut iter = self + .store.main_store + .get(self.writer.as_ref().unwrap(), property) + .map_err(|e| StorageError::BackendError)?; + let mut vector: Vec> = vec![]; + while let res = iter.next() { + vector.push(match res { + Some(Ok(val)) => val.1.to_bytes().unwrap(), + Some(Err(_e)) => return Err(StorageError::BackendError), + None => { + break; + } + }); + } + Ok(vector) + } + + /// Check if a specific value exists for a property from the store. + fn has_property_value( + &self, + prefix: u8, + key: &Vec, + suffix: Option, + value: Vec, + ) -> Result<(), StorageError> { + let property = LmdbBrokerStore::compute_property(prefix, key, suffix); + + let exists = self + .store.main_store + .get_key_value(self.writer.as_ref().unwrap(), property, &Value::Blob(value.as_slice())) + .map_err(|e| StorageError::BackendError)?; + if exists { + Ok(()) + } else { + Err(StorageError::NotFound) + } + } + +} + +impl<'a> WriteTransaction for LmdbTransaction<'a> { + + /// Save a property value to the store. + fn put( + &mut self, + prefix: u8, + key: &Vec, + suffix: Option, + value: &Vec, + ) -> Result<(), StorageError> { + let property = LmdbBrokerStore::compute_property(prefix, key, suffix); + self.store.main_store + .put(self.writer.as_mut().unwrap(), property, &Value::Blob(value.as_slice())) + .map_err(|e| StorageError::BackendError)?; + + Ok(()) + } + + /// Replace the property of a key (single value) to the store. + fn replace( + &mut self, + prefix: u8, + key: &Vec, + suffix: Option, + value: &Vec, + ) -> Result<(), StorageError> { + let property = LmdbBrokerStore::compute_property(prefix, key, suffix); + + self.store.main_store + .delete_all(self.writer.as_mut().unwrap(), property.clone()) + .map_err(|e| StorageError::BackendError)?; + + self.store.main_store + .put(self.writer.as_mut().unwrap(), property, &Value::Blob(value.as_slice())) + .map_err(|e| StorageError::BackendError)?; + + Ok(()) + } + + /// Delete a property from the store. + fn del(&mut self, prefix: u8, key: &Vec, suffix: Option) -> Result<(), StorageError> { + let property = LmdbBrokerStore::compute_property(prefix, key, suffix); + self.store.main_store + .delete_all(self.writer.as_mut().unwrap(), property) + .map_err(|e| StorageError::BackendError)?; + + Ok(()) + } + + /// Delete a specific value for a property from the store. + fn del_property_value( + &mut self, + prefix: u8, + key: &Vec, + suffix: Option, + value: &Vec, + ) -> Result<(), StorageError> { + let property = LmdbBrokerStore::compute_property(prefix, key, suffix); + self.store.main_store + .delete(self.writer.as_mut().unwrap(), property, &Value::Blob(value.as_slice())) + .map_err(|e| StorageError::BackendError)?; + + Ok(()) + } + + /// Delete all properties of a key from the store. + fn del_all(&mut self, prefix: u8, key: &Vec, all_suffixes: &[u8]) -> Result<(), StorageError> { + for suffix in all_suffixes { + self.del(prefix, key, Some(*suffix))?; + } + if all_suffixes.is_empty() { + self.del(prefix, key, None)?; + } + Ok(()) + } +} + + pub struct LmdbBrokerStore { /// the main store where all the properties of keys are stored main_store: MultiStore, @@ -37,7 +197,8 @@ pub struct LmdbBrokerStore { path: String, } -impl BrokerStore for LmdbBrokerStore { +impl ReadTransaction for LmdbBrokerStore { + /// Load a single value property from the store. fn get(&self, prefix: u8, key: &Vec, suffix: Option) -> Result, StorageError> { let property = Self::compute_property(prefix, key, suffix); @@ -103,6 +264,29 @@ impl BrokerStore for LmdbBrokerStore { } } + +} + +impl BrokerStore for LmdbBrokerStore { + + fn write_transaction(&self, method: & dyn Fn(&mut dyn WriteTransaction) -> Result<(), StorageError> )-> Result<(), StorageError> { + + let lock = self.environment.read().unwrap(); + let writer = lock.write().unwrap(); + + let mut transaction = LmdbTransaction { + store: self, + writer: Some(writer), + }; + let res = method(&mut transaction); + + if res.is_ok() { + transaction.commit(); + } + res + + } + /// Save a property value to the store. fn put( &self, @@ -111,16 +295,10 @@ impl BrokerStore for LmdbBrokerStore { suffix: Option, value: Vec, ) -> Result<(), StorageError> { - let property = Self::compute_property(prefix, key, suffix); - let lock = self.environment.read().unwrap(); - let mut writer = lock.write().unwrap(); - self.main_store - .put(&mut writer, property, &Value::Blob(value.as_slice())) - .map_err(|e| StorageError::BackendError)?; - writer.commit().unwrap(); - - Ok(()) + self.write_transaction(&|tx| { + tx.put(prefix,key,suffix,&value) + }) } /// Replace the property of a key (single value) to the store. @@ -131,34 +309,17 @@ impl BrokerStore for LmdbBrokerStore { suffix: Option, value: Vec, ) -> Result<(), StorageError> { - let property = Self::compute_property(prefix, key, suffix); - let lock = self.environment.read().unwrap(); - let mut writer = lock.write().unwrap(); - self.main_store - .delete_all(&mut writer, property.clone()) - .map_err(|e| StorageError::BackendError)?; - - self.main_store - .put(&mut writer, property, &Value::Blob(value.as_slice())) - .map_err(|e| StorageError::BackendError)?; - writer.commit().unwrap(); - - Ok(()) + self.write_transaction(&|tx| { + tx.replace(prefix,key,suffix,&value) + }) } /// Delete a property from the store. fn del(&self, prefix: u8, key: &Vec, suffix: Option) -> Result<(), StorageError> { - let property = Self::compute_property(prefix, key, suffix); - let lock = self.environment.read().unwrap(); - let mut writer = lock.write().unwrap(); - self.main_store - .delete_all(&mut writer, property) - .map_err(|e| StorageError::BackendError)?; - - writer.commit().unwrap(); - - Ok(()) + self.write_transaction(&|tx| { + tx.del(prefix,key,suffix) + }) } /// Delete a specific value for a property from the store. @@ -169,16 +330,9 @@ impl BrokerStore for LmdbBrokerStore { suffix: Option, value: Vec, ) -> Result<(), StorageError> { - let property = Self::compute_property(prefix, key, suffix); - let lock = self.environment.read().unwrap(); - let mut writer = lock.write().unwrap(); - self.main_store - .delete(&mut writer, property, &Value::Blob(value.as_slice())) - .map_err(|e| StorageError::BackendError)?; - - writer.commit().unwrap(); - - Ok(()) + self.write_transaction(&|tx| { + tx.del_property_value(prefix,key,suffix, &value) + }) } /// Delete all properties of a key from the store.