From 949d3ba519a1e150658e439a65c3197f81919bd6 Mon Sep 17 00:00:00 2001 From: Niko PLP Date: Thu, 3 Aug 2023 19:01:49 +0300 Subject: [PATCH] added transaction for get_or_create_single_key --- README.md | 2 +- p2p-broker/src/broker_store/account.rs | 4 +- p2p-broker/src/broker_store/invitation.rs | 4 +- p2p-broker/src/broker_store/overlay.rs | 2 +- p2p-broker/src/broker_store/peer.rs | 4 +- p2p-broker/src/broker_store/wallet.rs | 59 ++++++++++++++--------- p2p-repo/src/kcv_store.rs | 2 +- stores-lmdb/src/kcv_store.rs | 10 ++-- 8 files changed, 50 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index f8dc310..c6335d4 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ Read our [getting started guide](https://docs.nextgraph.org/en/getting-started/) ## For contributors -- [Install Rust](https://www.rust-lang.org/tools/install) minimum required 1.64.0 +- [Install Rust](https://www.rust-lang.org/tools/install) minimum required MSRV 1.64.0 - [Install Nodejs](https://nodejs.org/en/download/) until this [PR](https://github.com/rustwasm/wasm-pack/pull/1271) is accepted, will have to install wasm-pack this way: diff --git a/p2p-broker/src/broker_store/account.rs b/p2p-broker/src/broker_store/account.rs index aa40791..30ee14e 100644 --- a/p2p-broker/src/broker_store/account.rs +++ b/p2p-broker/src/broker_store/account.rs @@ -116,7 +116,7 @@ impl<'a> Account<'a> { let info_ser = to_vec(info)?; - self.store.write_transaction(&|tx| { + self.store.write_transaction(&mut |tx| { if tx .has_property_value( Self::PREFIX, @@ -218,7 +218,7 @@ impl<'a> Account<'a> { } pub fn del(&self) -> Result<(), StorageError> { - self.store.write_transaction(&|tx| { + self.store.write_transaction(&mut |tx| { if let Ok(clients) = tx.get_all(Self::PREFIX, &to_vec(&self.id)?, Some(Self::CLIENT)) { for client in clients { tx.del_all(Self::PREFIX_CLIENT, &client, &Self::ALL_CLIENT_PROPERTIES)?; diff --git a/p2p-broker/src/broker_store/invitation.rs b/p2p-broker/src/broker_store/invitation.rs index e53aead..4ed1340 100644 --- a/p2p-broker/src/broker_store/invitation.rs +++ b/p2p-broker/src/broker_store/invitation.rs @@ -74,7 +74,7 @@ impl<'a> Invitation<'a> { return Err(StorageError::BackendError); } let mut value = to_vec(&(code_type, expiry, memo.clone()))?; - store.write_transaction(&|tx| { + store.write_transaction(&mut |tx| { tx.put(Self::PREFIX, &to_vec(code)?, Some(Self::TYPE), &value)?; Ok(()) })?; @@ -168,7 +168,7 @@ impl<'a> Invitation<'a> { } pub fn del(&self) -> Result<(), StorageError> { - self.store.write_transaction(&|tx| { + self.store.write_transaction(&mut |tx| { tx.del_all(Self::PREFIX, &to_vec(&self.id)?, &Self::ALL_PROPERTIES)?; Ok(()) }) diff --git a/p2p-broker/src/broker_store/overlay.rs b/p2p-broker/src/broker_store/overlay.rs index 3347688..b18a694 100644 --- a/p2p-broker/src/broker_store/overlay.rs +++ b/p2p-broker/src/broker_store/overlay.rs @@ -73,7 +73,7 @@ impl<'a> Overlay<'a> { if acc.exists() { return Err(StorageError::BackendError); } - store.write_transaction(&|tx| { + store.write_transaction(&mut |tx| { tx.put( Self::PREFIX, &to_vec(&id)?, diff --git a/p2p-broker/src/broker_store/peer.rs b/p2p-broker/src/broker_store/peer.rs index 1e5240c..fea8648 100644 --- a/p2p-broker/src/broker_store/peer.rs +++ b/p2p-broker/src/broker_store/peer.rs @@ -71,7 +71,7 @@ impl<'a> Peer<'a> { if acc.exists() { return Err(StorageError::BackendError); } - store.write_transaction(&|tx| { + store.write_transaction(&mut |tx| { tx.put( Self::PREFIX, &to_vec(&id)?, @@ -128,7 +128,7 @@ impl<'a> Peer<'a> { if current_advert.version() >= advert.version() { return Ok(()); } - self.store.write_transaction(&|tx| { + self.store.write_transaction(&mut |tx| { tx.replace( Self::PREFIX, &to_vec(&self.id)?, diff --git a/p2p-broker/src/broker_store/wallet.rs b/p2p-broker/src/broker_store/wallet.rs index 8b0fc34..fc12933 100644 --- a/p2p-broker/src/broker_store/wallet.rs +++ b/p2p-broker/src/broker_store/wallet.rs @@ -11,6 +11,7 @@ use p2p_net::types::*; use p2p_repo::kcv_store::KCVStore; +use p2p_repo::kcv_store::WriteTransaction; use p2p_repo::log::*; use p2p_repo::store::*; use p2p_repo::types::*; @@ -44,28 +45,30 @@ impl<'a> Wallet<'a> { prefix: u8, key: &Vec, ) -> Result { - // FIXME. this get or create is not using a transaction, because calls will be made from the broker, that is behind a mutex. - // if this was to change, we should make the get and put inside one transaction. - let get = self - .store - .get(prefix, key, Some(Self::SUFFIX_FOR_EXIST_CHECK)); - match get { - Err(e) => { - if e == StorageError::NotFound { - self.create_single_key(prefix, key) - } else { - log_debug!("Error while creating single key {}", e); - Err(StorageError::BackendError) + let mut result: Option = None; + self.store.write_transaction(&mut |tx| { + let got = tx.get(prefix, key, Some(Self::SUFFIX_FOR_EXIST_CHECK)); + match got { + Err(e) => { + if e == StorageError::NotFound { + let res = Self::create_single_key(tx, prefix, key)?; + result = Some(res); + } else { + log_debug!("Error while creating single key {}", e); + return Err(StorageError::BackendError); + } + } + Ok(p) => { + let k: SymKey = p + .as_slice() + .try_into() + .map_err(|_| StorageError::BackendError)?; + result = Some(k); } } - Ok(p) => { - let k: SymKey = p - .as_slice() - .try_into() - .map_err(|_| StorageError::BackendError)?; - Ok(k) - } - } + Ok(()) + })?; + Ok(result.unwrap()) } pub fn get_or_create_user_key(&self, user: &UserId) -> Result { @@ -76,10 +79,14 @@ impl<'a> Wallet<'a> { self.get_or_create_single_key(Self::PREFIX_USER, &to_vec(overlay)?) } - pub fn create_single_key(&self, prefix: u8, key: &Vec) -> Result { + pub fn create_single_key( + tx: &mut dyn WriteTransaction, + prefix: u8, + key: &Vec, + ) -> Result { let symkey = SymKey::random(); let vec = symkey.slice().to_vec(); - self.store.put(prefix, key, Some(Self::SYM_KEY), vec)?; + tx.put(prefix, key, Some(Self::SYM_KEY), &vec)?; Ok(symkey) } pub fn exists_single_key(&self, prefix: u8, key: &Vec) -> bool { @@ -92,7 +99,13 @@ impl<'a> Wallet<'a> { self.exists_single_key(Self::PREFIX, &Self::KEY_ACCOUNTS.to_vec()) } pub fn create_accounts_key(&self) -> Result { - self.create_single_key(Self::PREFIX, &Self::KEY_ACCOUNTS.to_vec()) + let mut result: Option = None; + self.store.write_transaction(&mut |tx| { + let res = Self::create_single_key(tx, Self::PREFIX, &Self::KEY_ACCOUNTS.to_vec())?; + result = Some(res); + Ok(()) + })?; + Ok(result.unwrap()) } pub fn get_or_create_peers_key(&self) -> Result { self.get_or_create_single_key(Self::PREFIX, &Self::KEY_PEERS.to_vec()) diff --git a/p2p-repo/src/kcv_store.rs b/p2p-repo/src/kcv_store.rs index afa4ee2..502e32c 100644 --- a/p2p-repo/src/kcv_store.rs +++ b/p2p-repo/src/kcv_store.rs @@ -81,7 +81,7 @@ pub trait ReadTransaction { pub trait KCVStore: ReadTransaction { fn write_transaction( &self, - method: &dyn Fn(&mut dyn WriteTransaction) -> Result<(), StorageError>, + method: &mut dyn FnMut(&mut dyn WriteTransaction) -> Result<(), StorageError>, ) -> Result<(), StorageError>; /// Save a property value to the store. diff --git a/stores-lmdb/src/kcv_store.rs b/stores-lmdb/src/kcv_store.rs index ae00520..a7729b4 100644 --- a/stores-lmdb/src/kcv_store.rs +++ b/stores-lmdb/src/kcv_store.rs @@ -364,7 +364,7 @@ impl ReadTransaction for LmdbKCVStore { impl KCVStore for LmdbKCVStore { fn write_transaction( &self, - method: &dyn Fn(&mut dyn WriteTransaction) -> Result<(), StorageError>, + method: &mut dyn FnMut(&mut dyn WriteTransaction) -> Result<(), StorageError>, ) -> Result<(), StorageError> { let lock = self.environment.read().unwrap(); let writer = lock.write().unwrap(); @@ -389,7 +389,7 @@ impl KCVStore for LmdbKCVStore { suffix: Option, value: Vec, ) -> Result<(), StorageError> { - self.write_transaction(&|tx| tx.put(prefix, key, suffix, &value)) + self.write_transaction(&mut |tx| tx.put(prefix, key, suffix, &value)) } /// Replace the property of a key (single value) to the store. @@ -400,12 +400,12 @@ impl KCVStore for LmdbKCVStore { suffix: Option, value: Vec, ) -> Result<(), StorageError> { - self.write_transaction(&|tx| tx.replace(prefix, key, suffix, &value)) + self.write_transaction(&mut |tx| tx.replace(prefix, key, suffix, &value)) } /// Delete a property from the store. fn del(&self, prefix: u8, key: &Vec, suffix: Option) -> Result<(), StorageError> { - self.write_transaction(&|tx| tx.del(prefix, key, suffix)) + self.write_transaction(&mut |tx| tx.del(prefix, key, suffix)) } /// Delete a specific value for a property from the store. @@ -416,7 +416,7 @@ impl KCVStore for LmdbKCVStore { suffix: Option, value: Vec, ) -> Result<(), StorageError> { - self.write_transaction(&|tx| tx.del_property_value(prefix, key, suffix, &value)) + self.write_transaction(&mut |tx| tx.del_property_value(prefix, key, suffix, &value)) } /// Delete all properties of a key from the store.