refactor helper Class/Column for KVC storage

master
Niko PLP 2 months ago
parent a90264e391
commit 19c627e1b9
  1. 1
      ng-broker/src/actors/mod.rs
  2. 2
      ng-broker/src/lib.rs
  3. 9
      ng-broker/src/server_broker.rs
  4. 38
      ng-broker/src/server_storage/admin/account.rs
  5. 31
      ng-broker/src/server_storage/admin/invitation.rs
  6. 14
      ng-broker/src/server_storage/admin/wallet.rs
  7. 36
      ng-broker/src/server_storage/core/overlay.rs
  8. 32
      ng-broker/src/server_storage/core/peer.rs
  9. 243
      ng-broker/src/server_storage/core/topic.rs
  10. 2
      ng-net/src/actors/admin/add_invitation.rs
  11. 2
      ng-net/src/actors/admin/add_user.rs
  12. 2
      ng-net/src/actors/admin/del_user.rs
  13. 2
      ng-net/src/actors/admin/list_invitations.rs
  14. 2
      ng-net/src/actors/admin/list_users.rs
  15. 14
      ng-net/src/actors/admin/mod.rs
  16. 17
      ng-net/src/actors/mod.rs
  17. 2
      ng-net/src/types.rs
  18. 337
      ng-repo/src/kcv_storage.rs
  19. 12
      ng-repo/src/types.rs
  20. 5
      ng-verifier/src/user_storage/branch.rs
  21. 13
      ng-verifier/src/user_storage/mod.rs
  22. 2
      ng-verifier/src/user_storage/repo.rs
  23. 2
      ngaccount/src/main.rs
  24. 2
      ngcli/src/main.rs

@ -11,3 +11,5 @@ pub mod server_storage;
pub mod rocksdb_server_storage;
pub mod server_ws;
pub mod actors;

@ -22,16 +22,15 @@ use ng_repo::{
use crate::rocksdb_server_storage::RocksDbServerStorage;
struct TopicInfo {
/// can be None if the Broker is not currently serving this topic for its clients.
repo: Option<RepoHash>,
repo: RepoHash,
publisher_advert: Option<PublisherAdvert>,
current_heads: Vec<ObjectId>,
current_heads: HashSet<ObjectId>,
expose_outer: bool,
/// indicates which users have subscribed to topic (boolean says if as publisher or not)
/// indicates which users have opened the topic (boolean says if as publisher or not)
users: HashMap<UserId, bool>,
}
@ -47,6 +46,8 @@ struct RepoInfo {
struct OverlayInfo {
inner: Option<OverlayId>,
overlay_topic: Option<TopicId>,
topics: HashMap<TopicId, TopicInfo>,
repos: HashMap<RepoHash, RepoInfo>,

@ -24,7 +24,7 @@ use serde_bare::{from_slice, to_vec};
pub struct Account<'a> {
/// User ID
id: UserId,
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
}
impl<'a> Account<'a> {
@ -38,10 +38,10 @@ impl<'a> Account<'a> {
const ALL_CLIENT_PROPERTIES: [u8; 2] = [Self::INFO, Self::LAST_SEEN];
pub fn open(id: &UserId, store: &'a dyn KCVStorage) -> Result<Account<'a>, StorageError> {
pub fn open(id: &UserId, storage: &'a dyn KCVStorage) -> Result<Account<'a>, StorageError> {
let opening = Account {
id: id.clone(),
store,
storage,
};
if !opening.exists() {
return Err(StorageError::NotFound);
@ -51,16 +51,16 @@ impl<'a> Account<'a> {
pub fn create(
id: &UserId,
admin: bool,
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
) -> Result<Account<'a>, StorageError> {
let acc = Account {
id: id.clone(),
store,
storage,
};
if acc.exists() {
return Err(StorageError::AlreadyExists);
}
store.put(
storage.put(
Self::PREFIX_ACCOUNT,
&to_vec(&id)?,
None,
@ -73,12 +73,12 @@ impl<'a> Account<'a> {
#[allow(deprecated)]
pub fn get_all_users(
admins: bool,
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
) -> Result<Vec<UserId>, StorageError> {
let size = to_vec(&UserId::nil())?.len();
let mut res: Vec<UserId> = vec![];
for user in
store.get_all_keys_and_values(Self::PREFIX_ACCOUNT, size, vec![], None, &None)?
storage.get_all_keys_and_values(Self::PREFIX_ACCOUNT, size, vec![], None, &None)?
{
let admin: bool = from_slice(&user.1)?;
if admin == admins {
@ -89,7 +89,7 @@ impl<'a> Account<'a> {
Ok(res)
}
pub fn exists(&self) -> bool {
self.store
self.storage
.get(
Self::PREFIX_ACCOUNT,
&to_vec(&self.id).unwrap(),
@ -115,7 +115,7 @@ impl<'a> Account<'a> {
let info_ser = to_vec(info)?;
self.store.write_transaction(&mut |tx| {
self.storage.write_transaction(&mut |tx| {
let mut id_and_client = to_vec(&self.id)?;
id_and_client.append(&mut client_key_ser);
if tx
@ -158,7 +158,7 @@ impl<'a> Account<'a> {
}
// pub fn has_client(&self, client: &ClientId) -> Result<(), StorageError> {
// self.store.has_property_value(
// self.storage.has_property_value(
// Self::PREFIX,
// &to_vec(&self.id)?,
// Some(Self::CLIENT),
@ -170,7 +170,7 @@ impl<'a> Account<'a> {
// if !self.exists() {
// return Err(StorageError::BackendError);
// }
// self.store.put(
// self.storage.put(
// Self::PREFIX,
// &to_vec(&self.id)?,
// Some(Self::OVERLAY),
@ -178,7 +178,7 @@ impl<'a> Account<'a> {
// )
// }
// pub fn remove_overlay(&self, overlay: &OverlayId) -> Result<(), StorageError> {
// self.store.del_property_value(
// self.storage.del_property_value(
// Self::PREFIX,
// &to_vec(&self.id)?,
// Some(Self::OVERLAY),
@ -187,7 +187,7 @@ impl<'a> Account<'a> {
// }
// pub fn has_overlay(&self, overlay: &OverlayId) -> Result<(), StorageError> {
// self.store.has_property_value(
// self.storage.has_property_value(
// Self::PREFIX,
// &to_vec(&self.id)?,
// Some(Self::OVERLAY),
@ -197,7 +197,7 @@ impl<'a> Account<'a> {
pub fn is_admin(&self) -> Result<bool, StorageError> {
if self
.store
.storage
.has_property_value(
Self::PREFIX_ACCOUNT,
&to_vec(&self.id)?,
@ -213,7 +213,7 @@ impl<'a> Account<'a> {
}
pub fn del(&self) -> Result<(), StorageError> {
self.store.write_transaction(&mut |tx| {
self.storage.write_transaction(&mut |tx| {
let id = to_vec(&self.id)?;
// let mut id_and_client = to_vec(&self.id)?;
// let client_key = (client.clone(), hash);
@ -261,14 +261,14 @@ mod test {
let key: [u8; 32] = [0; 32];
fs::create_dir_all(root.path()).unwrap();
println!("{}", root.path().to_str().unwrap());
let mut store = RocksDbKCVStorage::open(root.path(), key).unwrap();
let mut storage = RocksDbKCVStorage::open(root.path(), key).unwrap();
let user_id = PubKey::Ed25519PubKey([1; 32]);
let account = Account::create(&user_id, true, &store).unwrap();
let account = Account::create(&user_id, true, &storage).unwrap();
println!("account created {}", account.id());
let account2 = Account::open(&user_id, &store).unwrap();
let account2 = Account::open(&user_id, &storage).unwrap();
println!("account opened {}", account2.id());
// let client_id = PubKey::Ed25519PubKey([56; 32]);

@ -27,7 +27,7 @@ use serde_bare::to_vec;
pub struct Invitation<'a> {
/// code
id: [u8; 32],
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
}
impl<'a> Invitation<'a> {
@ -45,10 +45,13 @@ impl<'a> Invitation<'a> {
const SUFFIX_FOR_EXIST_CHECK: u8 = Self::TYPE;
pub fn open(id: &[u8; 32], store: &'a dyn KCVStorage) -> Result<Invitation<'a>, StorageError> {
pub fn open(
id: &[u8; 32],
storage: &'a dyn KCVStorage,
) -> Result<Invitation<'a>, StorageError> {
let opening = Invitation {
id: id.clone(),
store,
storage,
};
if !opening.exists() {
return Err(StorageError::NotFound);
@ -59,7 +62,7 @@ impl<'a> Invitation<'a> {
id: &InvitationCode,
expiry: u32,
memo: &Option<String>,
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
) -> Result<Invitation<'a>, StorageError> {
let (code_type, code) = match id {
InvitationCode::Unique(c) => (0u8, c.slice()),
@ -68,13 +71,13 @@ impl<'a> Invitation<'a> {
};
let acc = Invitation {
id: code.clone(),
store,
storage,
};
if acc.exists() {
return Err(StorageError::BackendError);
}
let mut value = to_vec(&(code_type, expiry, memo.clone()))?;
store.write_transaction(&mut |tx| {
storage.write_transaction(&mut |tx| {
tx.put(
Self::PREFIX,
&to_vec(code)?,
@ -88,7 +91,7 @@ impl<'a> Invitation<'a> {
}
pub fn get_all_invitations(
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
mut admin: bool,
mut unique: bool,
mut multi: bool,
@ -100,7 +103,7 @@ impl<'a> Invitation<'a> {
unique = true;
multi = true;
}
for invite in store.get_all_keys_and_values(Self::PREFIX, size, vec![], None, &None)? {
for invite in storage.get_all_keys_and_values(Self::PREFIX, size, vec![], None, &None)? {
if invite.0.len() == size + 2 {
let code: [u8; 32] = from_slice(&invite.0[1..invite.0.len() - 1])?;
if invite.0[size + 1] == Self::TYPE {
@ -139,7 +142,7 @@ impl<'a> Invitation<'a> {
}
pub fn exists(&self) -> bool {
self.store
self.storage
.get(
Self::PREFIX,
&to_vec(&self.id).unwrap(),
@ -153,9 +156,9 @@ impl<'a> Invitation<'a> {
}
pub fn get_type(&self) -> Result<u8, ProtocolError> {
let type_ser = self
.store
.get(Self::PREFIX, &to_vec(&self.id)?, Some(Self::TYPE), &None)?;
let type_ser =
self.storage
.get(Self::PREFIX, &to_vec(&self.id)?, Some(Self::TYPE), &None)?;
let t: (u8, u32, Option<String>) = from_slice(&type_ser)?;
// if t.1 < now_timestamp() {
// return Err(ProtocolError::Expired);
@ -165,7 +168,7 @@ impl<'a> Invitation<'a> {
pub fn is_expired(&self) -> Result<bool, StorageError> {
let expire_ser =
self.store
self.storage
.get(Self::PREFIX, &to_vec(&self.id)?, Some(Self::TYPE), &None)?;
let expire: (u8, u32, Option<String>) = from_slice(&expire_ser)?;
if expire.1 < now_timestamp() {
@ -175,7 +178,7 @@ impl<'a> Invitation<'a> {
}
pub fn del(&self) -> Result<(), StorageError> {
self.store.write_transaction(&mut |tx| {
self.storage.write_transaction(&mut |tx| {
tx.del_all(
Self::PREFIX,
&to_vec(&self.id)?,

@ -7,7 +7,7 @@
// notice may not be copied, modified, or distributed except
// according to those terms.
//! Broker Wallet, persists to store all the SymKeys needed to open other storages
//! Broker Wallet, persists to storage all the SymKeys needed to open other storages
use ng_net::types::*;
use ng_repo::errors::StorageError;
@ -19,7 +19,7 @@ use serde::{Deserialize, Serialize};
use serde_bare::{from_slice, to_vec};
pub struct Wallet<'a> {
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
}
impl<'a> Wallet<'a> {
@ -39,8 +39,8 @@ impl<'a> Wallet<'a> {
const SUFFIX_FOR_EXIST_CHECK: u8 = Self::SYM_KEY;
pub fn open(store: &'a dyn KCVStorage) -> Wallet<'a> {
Wallet { store }
pub fn open(storage: &'a dyn KCVStorage) -> Wallet<'a> {
Wallet { storage }
}
pub fn get_or_create_single_key(
&self,
@ -48,7 +48,7 @@ impl<'a> Wallet<'a> {
key: &Vec<u8>,
) -> Result<SymKey, StorageError> {
let mut result: Option<SymKey> = None;
self.store.write_transaction(&mut |tx| {
self.storage.write_transaction(&mut |tx| {
let got = tx.get(prefix, key, Some(Self::SUFFIX_FOR_EXIST_CHECK), &None);
match got {
Err(e) => {
@ -92,7 +92,7 @@ impl<'a> Wallet<'a> {
Ok(symkey)
}
pub fn exists_single_key(&self, prefix: u8, key: &Vec<u8>) -> bool {
self.store
self.storage
.get(prefix, key, Some(Self::SUFFIX_FOR_EXIST_CHECK), &None)
.is_ok()
}
@ -102,7 +102,7 @@ impl<'a> Wallet<'a> {
}
pub fn create_accounts_key(&self) -> Result<SymKey, StorageError> {
let mut result: Option<SymKey> = None;
self.store.write_transaction(&mut |tx| {
self.storage.write_transaction(&mut |tx| {
let res = Self::create_single_key(tx, Self::PREFIX, &Self::KEY_ACCOUNTS.to_vec())?;
result = Some(res);
Ok(())

@ -27,7 +27,7 @@ pub struct OverlayMeta {
pub struct Overlay<'a> {
/// Overlay ID
id: OverlayId,
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
}
impl<'a> Overlay<'a> {
@ -50,10 +50,10 @@ impl<'a> Overlay<'a> {
const SUFFIX_FOR_EXIST_CHECK: u8 = Self::SECRET;
pub fn open(id: &OverlayId, store: &'a dyn KCVStorage) -> Result<Overlay<'a>, StorageError> {
pub fn open(id: &OverlayId, storage: &'a dyn KCVStorage) -> Result<Overlay<'a>, StorageError> {
let opening = Overlay {
id: id.clone(),
store,
storage,
};
if !opening.exists() {
return Err(StorageError::NotFound);
@ -64,16 +64,16 @@ impl<'a> Overlay<'a> {
id: &OverlayId,
secret: &SymKey,
repo: Option<PubKey>,
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
) -> Result<Overlay<'a>, StorageError> {
let acc = Overlay {
id: id.clone(),
store,
storage,
};
if acc.exists() {
return Err(StorageError::BackendError);
}
store.write_transaction(&mut |tx| {
storage.write_transaction(&mut |tx| {
tx.put(
Self::PREFIX,
&to_vec(&id)?,
@ -106,7 +106,7 @@ impl<'a> Overlay<'a> {
Ok(acc)
}
pub fn exists(&self) -> bool {
self.store
self.storage
.get(
Self::PREFIX,
&to_vec(&self.id).unwrap(),
@ -122,7 +122,7 @@ impl<'a> Overlay<'a> {
if !self.exists() {
return Err(StorageError::BackendError);
}
self.store.put(
self.storage.put(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::PEER),
@ -131,7 +131,7 @@ impl<'a> Overlay<'a> {
)
}
pub fn remove_peer(&self, peer: &PeerId) -> Result<(), StorageError> {
self.store.del_property_value(
self.storage.del_property_value(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::PEER),
@ -141,7 +141,7 @@ impl<'a> Overlay<'a> {
}
pub fn has_peer(&self, peer: &PeerId) -> Result<(), StorageError> {
self.store.has_property_value(
self.storage.has_property_value(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::PEER),
@ -154,7 +154,7 @@ impl<'a> Overlay<'a> {
if !self.exists() {
return Err(StorageError::BackendError);
}
self.store.put(
self.storage.put(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::TOPIC),
@ -163,7 +163,7 @@ impl<'a> Overlay<'a> {
)
}
pub fn remove_topic(&self, topic: &TopicId) -> Result<(), StorageError> {
self.store.del_property_value(
self.storage.del_property_value(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::TOPIC),
@ -173,7 +173,7 @@ impl<'a> Overlay<'a> {
}
pub fn has_topic(&self, topic: &TopicId) -> Result<(), StorageError> {
self.store.has_property_value(
self.storage.has_property_value(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::TOPIC),
@ -184,7 +184,7 @@ impl<'a> Overlay<'a> {
pub fn secret(&self) -> Result<SymKey, StorageError> {
match self
.store
.storage
.get(Self::PREFIX, &to_vec(&self.id)?, Some(Self::SECRET), &None)
{
Ok(secret) => Ok(from_slice::<SymKey>(&secret)?),
@ -194,7 +194,7 @@ impl<'a> Overlay<'a> {
pub fn metadata(&self) -> Result<OverlayMeta, StorageError> {
match self
.store
.storage
.get(Self::PREFIX, &to_vec(&self.id)?, Some(Self::META), &None)
{
Ok(meta) => Ok(from_slice::<OverlayMeta>(&meta)?),
@ -205,7 +205,7 @@ impl<'a> Overlay<'a> {
if !self.exists() {
return Err(StorageError::BackendError);
}
self.store.replace(
self.storage.replace(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::META),
@ -216,7 +216,7 @@ impl<'a> Overlay<'a> {
pub fn repo(&self) -> Result<PubKey, StorageError> {
match self
.store
.storage
.get(Self::PREFIX, &to_vec(&self.id)?, Some(Self::REPO), &None)
{
Ok(repo) => Ok(from_slice::<PubKey>(&repo)?),
@ -225,7 +225,7 @@ impl<'a> Overlay<'a> {
}
pub fn del(&self) -> Result<(), StorageError> {
self.store.del_all(
self.storage.del_all(
Self::PREFIX,
&to_vec(&self.id)?,
&Self::ALL_PROPERTIES,

@ -19,7 +19,7 @@ use serde_bare::{from_slice, to_vec};
pub struct Peer<'a> {
/// Topic ID
id: PeerId,
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
}
impl<'a> Peer<'a> {
@ -33,10 +33,10 @@ impl<'a> Peer<'a> {
const SUFFIX_FOR_EXIST_CHECK: u8 = Self::VERSION;
pub fn open(id: &PeerId, store: &'a dyn KCVStorage) -> Result<Peer<'a>, StorageError> {
pub fn open(id: &PeerId, storage: &'a dyn KCVStorage) -> Result<Peer<'a>, StorageError> {
let opening = Peer {
id: id.clone(),
store,
storage,
};
if !opening.exists() {
return Err(StorageError::NotFound);
@ -45,13 +45,13 @@ impl<'a> Peer<'a> {
}
pub fn update_or_create(
advert: &PeerAdvert,
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
) -> Result<Peer<'a>, StorageError> {
let id = advert.peer();
match Self::open(id, store) {
match Self::open(id, storage) {
Err(e) => {
if e == StorageError::NotFound {
Self::create(advert, store)
Self::create(advert, storage)
} else {
Err(StorageError::BackendError)
}
@ -64,17 +64,17 @@ impl<'a> Peer<'a> {
}
pub fn create(
advert: &PeerAdvert,
store: &'a dyn KCVStorage,
storage: &'a dyn KCVStorage,
) -> Result<Peer<'a>, StorageError> {
let id = advert.peer();
let acc = Peer {
id: id.clone(),
store,
storage,
};
if acc.exists() {
return Err(StorageError::BackendError);
}
store.write_transaction(&mut |tx| {
storage.write_transaction(&mut |tx| {
tx.put(
Self::PREFIX,
&to_vec(&id)?,
@ -94,7 +94,7 @@ impl<'a> Peer<'a> {
Ok(acc)
}
pub fn exists(&self) -> bool {
self.store
self.storage
.get(
Self::PREFIX,
&to_vec(&self.id).unwrap(),
@ -108,7 +108,7 @@ impl<'a> Peer<'a> {
}
pub fn version(&self) -> Result<u32, StorageError> {
match self
.store
.storage
.get(Self::PREFIX, &to_vec(&self.id)?, Some(Self::VERSION), &None)
{
Ok(ver) => Ok(from_slice::<u32>(&ver)?),
@ -119,7 +119,7 @@ impl<'a> Peer<'a> {
if !self.exists() {
return Err(StorageError::BackendError);
}
self.store.replace(
self.storage.replace(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::VERSION),
@ -135,7 +135,7 @@ impl<'a> Peer<'a> {
if current_advert.version() >= advert.version() {
return Ok(());
}
self.store.write_transaction(&mut |tx| {
self.storage.write_transaction(&mut |tx| {
tx.replace(
Self::PREFIX,
&to_vec(&self.id)?,
@ -155,7 +155,7 @@ impl<'a> Peer<'a> {
}
pub fn advert(&self) -> Result<PeerAdvert, StorageError> {
match self
.store
.storage
.get(Self::PREFIX, &to_vec(&self.id)?, Some(Self::ADVERT), &None)
{
Ok(advert) => Ok(from_slice::<PeerAdvert>(&advert)?),
@ -166,7 +166,7 @@ impl<'a> Peer<'a> {
if !self.exists() {
return Err(StorageError::BackendError);
}
self.store.replace(
self.storage.replace(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::ADVERT),
@ -176,7 +176,7 @@ impl<'a> Peer<'a> {
}
pub fn del(&self) -> Result<(), StorageError> {
self.store.del_all(
self.storage.del_all(
Self::PREFIX,
&to_vec(&self.id)?,
&Self::ALL_PROPERTIES,

@ -9,138 +9,151 @@
//! Topic
use std::collections::HashMap;
use std::collections::HashSet;
use ng_net::types::*;
use ng_repo::errors::StorageError;
use ng_repo::kcv_storage::KCVStorage;
use ng_repo::kcv_storage::*;
use ng_repo::types::*;
use serde::{Deserialize, Serialize};
use serde_bare::{from_slice, to_vec};
// TODO: versioning V0
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct TopicMeta {
pub users: u32,
}
use serde_bare::to_vec;
pub struct Topic<'a> {
/// Topic ID
id: TopicId,
store: &'a dyn KCVStorage,
key: Vec<u8>,
repo: ExistentialValue<RepoHash>,
storage: &'a dyn KCVStorage,
}
impl<'a> IModel for Topic<'a> {
fn key(&self) -> &Vec<u8> {
&self.key
}
fn storage(&self) -> &dyn KCVStorage {
self.storage
}
fn class(&self) -> &Class {
&Self::CLASS
}
fn existential(&mut self) -> &mut dyn IExistentialValue {
&mut self.repo
}
}
impl<'a> Topic<'a> {
const PREFIX: u8 = b"t"[0];
const PREFIX: u8 = b't';
// propertie's suffixes
const ADVERT: u8 = b"a"[0];
const HEAD: u8 = b"h"[0];
const META: u8 = b"m"[0];
// Topic properties
const ADVERT: SingleValueColumn<Self, PublisherAdvert> = SingleValueColumn::new(b'a');
const REPO: ExistentialValueColumn = ExistentialValueColumn::new(b'r');
const ROOT_COMMIT: SingleValueColumn<Self, ObjectId> = SingleValueColumn::new(b'o');
const ALL_PROPERTIES: [u8; 3] = [Self::ADVERT, Self::HEAD, Self::META];
// Topic <-> Users who pinned it
pub const USERS: MultiValueColumn<Self, UserId> = MultiValueColumn::new(b'u');
// Topic <-> heads
pub const HEADS: MultiValueColumn<Self, ObjectId> = MultiValueColumn::new(b'h');
const SUFFIX_FOR_EXIST_CHECK: u8 = Self::META;
const CLASS: Class<'a> = Class::new(
Self::PREFIX,
&Self::REPO,
vec![&Self::ADVERT, &Self::ROOT_COMMIT],
vec![&Self::USERS, &Self::HEADS],
);
pub fn load(&self) -> Result<(), StorageError> {
let props = self.load_props()?;
// let bs = BranchInfo {
// id: id.clone(),
// branch_type: prop(Self::TYPE, &props)?,
// read_cap: prop(Self::READ_CAP, &props)?,
// topic: prop(Self::TOPIC, &props)?,
// topic_priv_key: prop(Self::PUBLISHER, &props).ok(),
// current_heads: Self::get_all_heads(id, storage)?,
// };
// Ok(bs)
Ok(())
}
pub fn open(id: &TopicId, store: &'a dyn KCVStorage) -> Result<Topic<'a>, StorageError> {
let opening = Topic {
id: id.clone(),
store,
};
if !opening.exists() {
return Err(StorageError::NotFound);
pub fn new(id: &TopicId, overlay: &OverlayId, storage: &'a dyn KCVStorage) -> Self {
let mut key: Vec<u8> = Vec::with_capacity(33 + 33);
key.append(&mut to_vec(overlay).unwrap());
key.append(&mut to_vec(id).unwrap());
Topic {
key,
repo: ExistentialValue::<RepoHash>::new(),
storage,
}
}
pub fn open(
id: &TopicId,
overlay: &OverlayId,
storage: &'a dyn KCVStorage,
) -> Result<Topic<'a>, StorageError> {
let mut opening = Topic::new(id, overlay, storage);
opening.check_exists()?;
Ok(opening)
}
pub fn create(id: &TopicId, store: &'a mut dyn KCVStorage) -> Result<Topic<'a>, StorageError> {
let acc = Topic {
id: id.clone(),
store,
};
if acc.exists() {
return Err(StorageError::BackendError);
}
let meta = TopicMeta { users: 0 };
store.put(
Self::PREFIX,
&to_vec(&id)?,
Some(Self::META),
&to_vec(&meta)?,
&None,
)?;
Ok(acc)
}
pub fn exists(&self) -> bool {
self.store
.get(
Self::PREFIX,
&to_vec(&self.id).unwrap(),
Some(Self::SUFFIX_FOR_EXIST_CHECK),
&None,
)
.is_ok()
}
pub fn id(&self) -> TopicId {
self.id
}
pub fn add_head(&self, head: &ObjectId) -> Result<(), StorageError> {
if !self.exists() {
return Err(StorageError::BackendError);
}
self.store.put(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::HEAD),
&to_vec(head)?,
&None,
)
}
pub fn remove_head(&self, head: &ObjectId) -> Result<(), StorageError> {
self.store.del_property_value(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::HEAD),
&to_vec(head)?,
&None,
)
}
pub fn has_head(&self, head: &ObjectId) -> Result<(), StorageError> {
self.store.has_property_value(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::HEAD),
&to_vec(head)?,
&None,
)
}
pub fn metadata(&self) -> Result<TopicMeta, StorageError> {
match self
.store
.get(Self::PREFIX, &to_vec(&self.id)?, Some(Self::META), &None)
{
Ok(meta) => Ok(from_slice::<TopicMeta>(&meta)?),
Err(e) => Err(e),
pub fn create(
id: &TopicId,
overlay: &OverlayId,
repo: &RepoHash,
storage: &'a mut dyn KCVStorage,
) -> Result<Topic<'a>, StorageError> {
let mut topic = Topic::new(id, overlay, storage);
if topic.exists() {
return Err(StorageError::AlreadyExists);
}
topic.repo.set(repo, &topic)?;
Ok(topic)
}
pub fn set_metadata(&self, meta: &TopicMeta) -> Result<(), StorageError> {
if !self.exists() {
return Err(StorageError::BackendError);
}
self.store.replace(
Self::PREFIX,
&to_vec(&self.id)?,
Some(Self::META),
&to_vec(meta)?,
&None,
)
}
pub fn del(&self) -> Result<(), StorageError> {
self.store.del_all(
Self::PREFIX,
&to_vec(&self.id)?,
&Self::ALL_PROPERTIES,
&None,
)
pub fn repo_hash(&self) -> &RepoHash {
self.repo.get().unwrap()
}
pub fn root_commit(&mut self) -> Result<ObjectId, StorageError> {
Self::ROOT_COMMIT.get(self)
}
pub fn set_root_commit(&mut self, commit: &ObjectId) -> Result<(), StorageError> {
Self::ROOT_COMMIT.set(self, commit)
}
pub fn publisher_advert(&mut self) -> Result<PublisherAdvert, StorageError> {
Self::ADVERT.get(self)
}
pub fn set_publisher_advert(&mut self, advert: &PublisherAdvert) -> Result<(), StorageError> {
Self::ADVERT.set(self, advert)
}
pub fn add_head(&mut self, head: &ObjectId) -> Result<(), StorageError> {
Self::HEADS.add(self, head)
}
pub fn remove_head(&mut self, head: &ObjectId) -> Result<(), StorageError> {
Self::HEADS.remove(self, head)
}
pub fn has_head(&mut self, head: &ObjectId) -> Result<(), StorageError> {
Self::HEADS.has(self, head)
}
pub fn get_all_heads(&mut self) -> Result<HashSet<ObjectId>, StorageError> {
Self::HEADS.get_all(self)
}
pub fn add_user(&mut self, user: &UserId) -> Result<(), StorageError> {
Self::USERS.add(self, user)
}
pub fn remove_user(&mut self, user: &UserId) -> Result<(), StorageError> {
Self::USERS.remove(self, user)
}
pub fn has_user(&mut self, user: &UserId) -> Result<(), StorageError> {
Self::USERS.has(self, user)
}
pub fn get_all_users(&mut self) -> Result<HashSet<UserId>, StorageError> {
Self::USERS.get_all(self)
}
}

@ -19,7 +19,7 @@ use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use super::StartProtocol;
use super::super::StartProtocol;
/// Add invitation
#[derive(Clone, Debug, Serialize, Deserialize)]

@ -19,7 +19,7 @@ use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use super::StartProtocol;
use super::super::StartProtocol;
/// Add user account
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]

@ -18,7 +18,7 @@ use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use super::StartProtocol;
use super::super::StartProtocol;
/// Delete user account V0
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]

@ -19,7 +19,7 @@ use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use super::StartProtocol;
use super::super::StartProtocol;
/// List invitations registered on this broker
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]

@ -19,7 +19,7 @@ use ng_repo::types::PubKey;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use super::StartProtocol;
use super::super::StartProtocol;
/// List users registered on this broker
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]

@ -0,0 +1,14 @@
pub mod add_user;
pub use add_user::*;
pub mod del_user;
pub use del_user::*;
pub mod list_users;
pub use list_users::*;
pub mod add_invitation;
pub use add_invitation::*;
pub mod list_invitations;
pub use list_invitations::*;

@ -9,22 +9,9 @@ pub use start::*;
pub mod probe;
pub use probe::*;
pub mod add_user;
pub use add_user::*;
pub mod del_user;
pub use del_user::*;
pub mod list_users;
pub use list_users::*;
pub mod add_invitation;
pub use add_invitation::*;
pub mod list_invitations;
pub use list_invitations::*;
pub mod connecting;
pub use connecting::*;
pub mod client;
pub mod admin;

@ -16,7 +16,7 @@ use crate::utils::{
is_public_ipv4, is_public_ipv6,
};
use crate::WS_PORT_ALTERNATE;
use crate::{actor::EActor, actors::*};
use crate::{actor::EActor, actors::admin::*, actors::*};
use core::fmt;
use ng_repo::errors::*;
use ng_repo::log::*;

@ -6,13 +6,346 @@
// notice may not be copied, modified, or distributed except
// according to those terms.
//! KeyColumnValue Store abstraction
//! KeyColumnValue Storage abstraction
use std::collections::HashMap;
use std::{collections::HashSet, marker::PhantomData};
use crate::errors::StorageError;
use serde::{Deserialize, Serialize};
use serde_bare::{from_slice, to_vec};
// TODO:remove mut on self for trait WriteTransaction methods
pub fn prop<A>(prop: u8, props: &HashMap<u8, Vec<u8>>) -> Result<A, StorageError>
where
A: for<'a> Deserialize<'a>,
{
Ok(from_slice(
&props.get(&prop).ok_or(StorageError::PropertyNotFound)?,
)?)
}
pub struct Class<'a> {
columns: Vec<&'a dyn ISingleValueColumn>,
multi_value_columns: Vec<&'a dyn IMultiValueColumn>,
existential_column: &'a dyn ISingleValueColumn,
prefix: u8,
}
impl<'a> Class<'a> {
pub fn new(
prefix: u8,
existential_column: &'a dyn ISingleValueColumn,
columns: Vec<&'a dyn ISingleValueColumn>,
multi_value_columns: Vec<&'a dyn IMultiValueColumn>,
) -> Self {
// check unicity of prefixes and suffixes
#[cfg(test)]
{
let mut prefixes = HashSet::from([prefix]);
let mut suffixes = HashSet::from([existential_column.suffix()]);
for column in columns.iter() {
if !suffixes.insert(column.suffix()) {
panic!("duplicate suffix {} !!! check the code", column.suffix());
}
}
for mvc in multi_value_columns.iter() {
if !prefixes.insert(mvc.prefix()) {
panic!("duplicate prefix {} !!! check the code", mvc.prefix());
}
}
}
Self {
columns,
multi_value_columns,
prefix,
existential_column,
}
}
fn suffices(&self) -> Vec<u8> {
let mut res: Vec<u8> = self.columns.iter().map(|c| c.suffix()).collect();
res.push(self.existential_column.suffix());
res
}
}
pub trait IModel {
fn key(&self) -> &Vec<u8>;
fn prefix(&self) -> u8 {
self.class().prefix
}
fn check_exists(&mut self) -> Result<(), StorageError> {
if !self.exists() {
return Err(StorageError::NotFound);
}
Ok(())
}
fn existential(&mut self) -> &mut dyn IExistentialValue;
fn exists(&mut self) -> bool {
if self.existential().exists() {
return true;
}
let prefix = self.prefix();
let key = self.key();
let suffix = self.class().existential_column.suffix();
match self.storage().get(prefix, key, Some(suffix), &None) {
Ok(res) => {
self.existential().process_exists(res);
true
}
Err(e) => false,
}
}
fn storage(&self) -> &dyn KCVStorage;
fn load_props(&self) -> Result<HashMap<u8, Vec<u8>>, StorageError> {
self.storage().get_all_properties_of_key(
self.prefix(),
self.key().to_vec(),
self.class().suffices(),
&None,
)
}
fn class(&self) -> &Class;
fn del(&self) -> Result<(), StorageError> {
self.storage().write_transaction(&mut |tx| {
tx.del_all(self.prefix(), self.key(), &self.class().suffices(), &None)?;
for mvc in self.class().multi_value_columns.iter() {
let size = mvc.value_size()?;
tx.del_all_values(self.prefix(), self.key(), size, None, &None)?;
}
Ok(())
})?;
Ok(())
}
}
use std::hash::Hash;
pub struct MultiValueColumn<
Model: IModel,
Column: Eq + PartialEq + Hash + Serialize + Default + for<'a> Deserialize<'a>,
> {
prefix: u8,
phantom: PhantomData<Column>,
model: PhantomData<Model>,
}
impl<
Model: IModel,
Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>,
> MultiValueColumn<Model, Column>
{
pub fn new(prefix: u8) -> Self {
MultiValueColumn {
prefix,
phantom: PhantomData,
model: PhantomData,
}
}
pub fn add(&self, model: &mut Model, column: &Column) -> Result<(), StorageError> {
model.check_exists()?;
model
.storage()
.put(self.prefix, model.key(), None, &to_vec(column)?, &None)
}
pub fn remove(&self, model: &mut Model, column: &Column) -> Result<(), StorageError> {
model.check_exists()?;
model
.storage()
.del_property_value(self.prefix, model.key(), None, &to_vec(column)?, &None)
}
pub fn has(&self, model: &mut Model, column: &Column) -> Result<(), StorageError> {
model.check_exists()?;
model
.storage()
.has_property_value(self.prefix, model.key(), None, &to_vec(column)?, &None)
}
pub fn get_all(&self, model: &mut Model) -> Result<HashSet<Column>, StorageError> {
model.check_exists()?;
let key_prefix = model.key();
let value_size = to_vec(&Column::default())?.len();
let mut res: HashSet<Column> = HashSet::new();
let total_size = key_prefix.len() + value_size;
for user in model.storage().get_all_keys_and_values(
self.prefix,
total_size,
key_prefix.to_vec(),
None,
&None,
)? {
if user.0.len() == total_size + 1 {
let user: Column = from_slice(&user.0[1..user.0.len()])?;
res.insert(user);
}
}
Ok(res)
}
}
impl<
Model: IModel,
Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>,
> IMultiValueColumn for MultiValueColumn<Model, Column>
{
fn value_size(&self) -> Result<usize, StorageError> {
Ok(to_vec(&Column::default())?.len())
}
fn prefix(&self) -> u8 {
self.prefix
}
}
pub trait ISingleValueColumn {
fn suffix(&self) -> u8;
}
pub trait IMultiValueColumn {
fn prefix(&self) -> u8;
fn value_size(&self) -> Result<usize, StorageError>;
}
pub struct SingleValueColumn<Model: IModel, Column: Serialize + for<'a> Deserialize<'a>> {
suffix: u8,
phantom: PhantomData<Column>,
model: PhantomData<Model>,
}
impl<Model: IModel, Column: Serialize + for<'d> Deserialize<'d>> ISingleValueColumn
for SingleValueColumn<Model, Column>
{
fn suffix(&self) -> u8 {
self.suffix
}
}
pub struct ExistentialValueColumn {
suffix: u8,
}
impl ISingleValueColumn for ExistentialValueColumn {
fn suffix(&self) -> u8 {
self.suffix
}
}
impl ExistentialValueColumn {
pub fn new(suffix: u8) -> Self {
ExistentialValueColumn { suffix }
}
}
impl<Model: IModel, Column: Serialize + for<'d> Deserialize<'d>> SingleValueColumn<Model, Column> {
pub fn new(suffix: u8) -> Self {
SingleValueColumn {
suffix,
phantom: PhantomData,
model: PhantomData,
}
}
pub fn set(&self, model: &mut Model, column: &Column) -> Result<(), StorageError> {
model.check_exists()?;
model.storage().replace(
model.prefix(),
model.key(),
Some(self.suffix),
&to_vec(column)?,
&None,
)
}
pub fn get(&self, model: &mut Model) -> Result<Column, StorageError> {
model.check_exists()?;
match model
.storage()
.get(model.prefix(), model.key(), Some(self.suffix), &None)
{
Ok(res) => Ok(from_slice::<Column>(&res)?),
Err(e) => Err(e),
}
}
pub fn has(&self, model: &mut Model, column: &Column) -> Result<(), StorageError> {
model.check_exists()?;
model.storage().has_property_value(
model.prefix(),
model.key(),
Some(self.suffix),
&to_vec(column)?,
&None,
)
}
pub fn del(
&self,
model: &mut Model,
tx: &mut dyn WriteTransaction,
) -> Result<(), StorageError> {
tx.del(model.prefix(), model.key(), Some(self.suffix), &None)
}
}
pub struct ExistentialValue<Column: Serialize + for<'d> Deserialize<'d>> {
value: Option<Column>,
value_ser: Vec<u8>,
}
pub trait IExistentialValue {
fn process_exists(&mut self, value_ser: Vec<u8>);
fn exists(&self) -> bool;
}
impl<Column: Serialize + for<'d> Deserialize<'d>> IExistentialValue for ExistentialValue<Column> {
fn exists(&self) -> bool {
self.value.is_some() || self.value_ser.len() > 0
}
fn process_exists(&mut self, value_ser: Vec<u8>) {
self.value_ser = value_ser;
}
}
impl<Column: Clone + Serialize + for<'d> Deserialize<'d>> ExistentialValue<Column> {
pub fn new() -> Self {
ExistentialValue {
value: None,
value_ser: vec![],
}
}
pub fn set<Model: IModel>(
&mut self,
value: &Column,
model: &Model,
) -> Result<(), StorageError> {
if self.value.is_some() {
return Err(StorageError::AlreadyExists);
}
model.storage().replace(
model.prefix(),
model.key(),
Some(model.class().existential_column.suffix()),
&to_vec(value)?,
&None,
)?;
self.value = Some(value.clone());
Ok(())
}
pub fn get(&mut self) -> Result<&Column, StorageError> {
if self.value.is_some() {
return Ok(self.value.as_ref().unwrap());
}
if self.value_ser.len() == 0 {
return Err(StorageError::BackendError);
}
let value = from_slice::<Column>(&self.value_ser);
match value {
Err(_) => return Err(StorageError::InvalidValue),
Ok(val) => {
self.value = Some(val);
return Ok(self.value.as_ref().unwrap());
}
}
}
}
pub trait WriteTransaction: ReadTransaction {
/// Save a property value to the store.

@ -137,6 +137,12 @@ pub enum PubKey {
X25519PubKey(X25519PubKey),
}
impl Default for PubKey {
fn default() -> Self {
Self::nil()
}
}
impl PubKey {
pub fn slice(&self) -> &[u8; 32] {
match self {
@ -398,6 +404,12 @@ pub struct BlockRef {
pub key: BlockKey,
}
impl Default for BlockId {
fn default() -> Self {
Self::nil()
}
}
impl BlockId {
#[cfg(any(test, feature = "testing"))]
pub fn dummy() -> Self {

@ -19,6 +19,7 @@ use ng_net::types::*;
use ng_repo::block_storage::BlockStorage;
use ng_repo::errors::ProtocolError;
use ng_repo::errors::StorageError;
use ng_repo::kcv_storage::prop;
use ng_repo::kcv_storage::KCVStorage;
use ng_repo::repo::BranchInfo;
use ng_repo::repo::Repo;
@ -36,8 +37,6 @@ use ng_repo::types::TopicId;
use serde_bare::from_slice;
use serde_bare::to_vec;
use super::prop;
pub struct BranchStorage<'a> {
storage: &'a dyn KCVStorage,
id: BranchId,
@ -87,6 +86,8 @@ impl<'a> BranchStorage<'a> {
)
}
//TODO: save all branch info under the repo_id (key prefix should be repo_id)
pub fn create(
id: &BranchId,
read_cap: &ReadCap,

@ -14,16 +14,3 @@ pub use storage::*;
pub mod repo;
pub mod branch;
use ng_repo::errors::StorageError;
use serde::Deserialize;
use serde_bare::from_slice;
use std::collections::HashMap;
pub(crate) fn prop<A>(prop: u8, props: &HashMap<u8, Vec<u8>>) -> Result<A, StorageError>
where
A: for<'a> Deserialize<'a>,
{
Ok(from_slice(
&props.get(&prop).ok_or(StorageError::PropertyNotFound)?,
)?)
}

@ -21,6 +21,7 @@ use ng_net::types::*;
use ng_repo::block_storage::BlockStorage;
use ng_repo::errors::ProtocolError;
use ng_repo::errors::StorageError;
use ng_repo::kcv_storage::prop;
use ng_repo::kcv_storage::KCVStorage;
use ng_repo::log::*;
use ng_repo::repo::BranchInfo;
@ -43,7 +44,6 @@ use serde_bare::to_vec;
use std::sync::{Arc, RwLock};
use super::branch::BranchStorage;
use super::prop;
pub struct RepoStorage<'a> {
storage: &'a dyn KCVStorage,

@ -13,7 +13,7 @@ mod types;
use duration_str::parse;
use ng_client_ws::remote_ws::ConnectionWebSocket;
use ng_net::actors::add_invitation::*;
use ng_net::actors::admin::add_invitation::*;
use ng_net::broker::BROKER;
use serde::{Deserialize, Serialize};
use warp::http::header::{HeaderMap, HeaderValue};

@ -12,7 +12,7 @@ use ed25519_dalek::*;
use core::fmt;
use duration_str::parse;
use futures::{future, pin_mut, stream, SinkExt, StreamExt};
use ng_net::actors::*;
use ng_net::actors::admin::*;
use ng_repo::block_storage::{
store_max_value_size, store_valid_value_size, BlockStorage, HashMapBlockStorage,
};

Loading…
Cancel
Save