RepoOKM and checking the OKM config in debug

pull/19/head
Niko PLP 8 months ago
parent 4ba34c7300
commit 2aa996f01e
  1. 21
      ng-broker/src/rocksdb_server_storage.rs
  2. 18
      ng-broker/src/server_broker.rs
  3. 2
      ng-broker/src/server_storage/admin/account.rs
  4. 2
      ng-broker/src/server_storage/admin/invitation.rs
  5. 2
      ng-broker/src/server_storage/admin/wallet.rs
  6. 6
      ng-broker/src/server_storage/core/mod.rs
  7. 2
      ng-broker/src/server_storage/core/overlay.rs
  8. 95
      ng-broker/src/server_storage/core/repo.rs
  9. 34
      ng-broker/src/server_storage/core/topic.rs
  10. 172
      ng-repo/src/kcv_storage.rs
  11. 31
      ng-storage-rocksdb/src/kcv_storage.rs
  12. 2
      ng-verifier/src/user_storage/branch.rs
  13. 2
      ng-verifier/src/user_storage/repo.rs

@ -18,6 +18,7 @@ use std::sync::Mutex;
use crate::server_storage::admin::account::Account;
use crate::server_storage::admin::invitation::Invitation;
use crate::server_storage::admin::wallet::Wallet;
use crate::server_storage::core::*;
use crate::types::*;
use ng_net::server_broker::*;
use ng_net::types::*;
@ -50,7 +51,7 @@ impl RocksDbServerStorage {
std::fs::create_dir_all(wallet_path.clone()).unwrap();
log_debug!("opening wallet DB");
//TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way
let wallet_storage = RocksDbKCVStorage::open(&wallet_path, master_key.slice().clone())?;
let mut wallet_storage = RocksDbKCVStorage::open(&wallet_path, master_key.slice().clone())?;
let wallet = Wallet::open(&wallet_storage);
// create/open the ACCOUNTS storage
@ -89,7 +90,7 @@ impl RocksDbServerStorage {
log_debug!("opening accounts DB");
std::fs::create_dir_all(accounts_path.clone()).unwrap();
//TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way
let accounts_storage =
let mut accounts_storage =
RocksDbKCVStorage::open(&accounts_path, accounts_key.slice().clone())?;
// create/open the PEERS storage
@ -120,7 +121,21 @@ impl RocksDbServerStorage {
core_path.push("core");
std::fs::create_dir_all(core_path.clone()).unwrap();
//TODO redo the whole key passing mechanism in RKV so it uses zeroize all the way
let core_storage = RocksDbKCVStorage::open(&core_path, core_key.slice().clone())?;
let mut core_storage = RocksDbKCVStorage::open(&core_path, core_key.slice().clone())?;
// check unicity of class prefixes, by storage
#[cfg(debug_assertions)]
{
//log_debug!("CHECKING...");
// wallet_storage.add_class(&Wallet::CLASS);
// wallet_storage.check_prefixes();
// accounts_storage.add_class(&Account::CLASS);
// accounts_storage.add_class(&Invitation::CLASS);
// accounts_storage.check_prefixes();
core_storage.add_class(&Topic::CLASS);
core_storage.add_class(&RepoOKM::CLASS);
core_storage.check_prefixes();
}
Ok(RocksDbServerStorage {
wallet_storage,

@ -22,28 +22,28 @@ use ng_repo::{
use crate::rocksdb_server_storage::RocksDbServerStorage;
pub struct TopicInfo {
repo: RepoHash,
pub repo: RepoHash,
publisher_advert: Option<PublisherAdvert>,
pub publisher_advert: Option<PublisherAdvert>,
current_heads: HashSet<ObjectId>,
pub current_heads: HashSet<ObjectId>,
root_commit: Option<ObjectId>,
pub root_commit: Option<ObjectId>,
/// indicates which users have opened the topic (boolean says if as publisher or not)
users: HashMap<UserId, bool>,
pub users: HashMap<UserId, bool>,
}
struct RepoInfo {
pub struct RepoInfo {
/// set of users that requested the repo to be exposed on the outer overlay
/// only possible if the user is a publisher
expose_outer: HashSet<UserId>,
pub expose_outer: HashSet<UserId>,
/// set of topics of this repo
topics: HashSet<TopicId>,
pub topics: HashSet<TopicId>,
}
struct OverlayInfo {
pub struct OverlayInfo {
inner: Option<OverlayId>,
overlay_topic: Option<TopicId>,

@ -7,7 +7,7 @@
// notice may not be copied, modified, or distributed except
// according to those terms.
//! User account
//! User account OKM (Object Key/Col/Value Mapping)
use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;

@ -7,7 +7,7 @@
// notice may not be copied, modified, or distributed except
// according to those terms.
//! User account
//! User account OKM (Object Key/Col/Value Mapping)
use std::collections::hash_map::DefaultHasher;
use std::hash::Hash;

@ -7,7 +7,7 @@
// notice may not be copied, modified, or distributed except
// according to those terms.
//! Broker Wallet, persists to storage all the SymKeys needed to open other storages
//! Broker Wallet OKM (Object Key/Col/Value Mapping), persists to storage all the SymKeys needed to open other storages
use ng_net::types::*;
use ng_repo::errors::StorageError;

@ -1,5 +1,11 @@
pub mod overlay;
pub use overlay::*;
pub mod peer;
pub use peer::*;
pub mod topic;
pub use topic::*;
pub mod repo;
pub use repo::*;

@ -7,7 +7,7 @@
// notice may not be copied, modified, or distributed except
// according to those terms.
//! Overlay
//! Overlay OKM (Object Key/Col/Value Mapping)
use ng_net::types::*;
use ng_repo::errors::StorageError;

@ -0,0 +1,95 @@
// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
//! Repo OKM (Object Key/Col/Value Mapping)
use std::collections::HashMap;
use std::collections::HashSet;
use ng_net::types::*;
use ng_repo::errors::StorageError;
use ng_repo::kcv_storage::*;
use ng_repo::types::*;
use serde_bare::to_vec;
use crate::server_broker::RepoInfo;
pub struct RepoOKM<'a> {
key: Vec<u8>,
storage: &'a dyn KCVStorage,
}
impl<'a> IModel for RepoOKM<'a> {
fn key(&self) -> &Vec<u8> {
&self.key
}
fn storage(&self) -> &dyn KCVStorage {
self.storage
}
fn class(&self) -> &Class {
&Self::CLASS
}
fn existential(&mut self) -> Option<&mut dyn IExistentialValue> {
None
}
}
impl<'a> RepoOKM<'a> {
// RepoHash <-> Topic : list of topics of a repo that was pinned on the broker
pub const TOPICS: MultiValueColumn<Self, TopicId> = MultiValueColumn::new(b'r');
// RepoHash <-> User : list of users who asked to expose the repo to the outer overlay
pub const EXPOSE_OUTER: MultiValueColumn<Self, UserId> = MultiValueColumn::new(b'x');
pub const CLASS: Class<'a> = Class::new(
"Repo",
None,
None,
&[],
&[&Self::TOPICS as &dyn IMultiValueColumn, &Self::EXPOSE_OUTER],
);
pub fn load(
repo: &RepoHash,
overlay: &OverlayId,
storage: &'a dyn KCVStorage,
) -> Result<RepoInfo, StorageError> {
let mut opening = Self::new(repo, overlay, storage);
let info = RepoInfo {
topics: Self::TOPICS.get_all(&mut opening)?,
expose_outer: Self::EXPOSE_OUTER.get_all(&mut opening)?,
};
Ok(info)
}
pub fn new(repo: &RepoHash, overlay: &OverlayId, storage: &'a dyn KCVStorage) -> Self {
let mut key: Vec<u8> = Vec::with_capacity(33 + 33);
key.append(&mut to_vec(overlay).unwrap());
key.append(&mut to_vec(repo).unwrap());
Self { key, storage }
}
pub fn open(
repo: &RepoHash,
overlay: &OverlayId,
storage: &'a dyn KCVStorage,
) -> Result<RepoOKM<'a>, StorageError> {
let mut opening = Self::new(repo, overlay, storage);
Ok(opening)
}
pub fn create(
repo: &RepoHash,
overlay: &OverlayId,
storage: &'a mut dyn KCVStorage,
) -> Result<RepoOKM<'a>, StorageError> {
let mut creating = Self::new(repo, overlay, storage);
Ok(creating)
}
}

@ -7,7 +7,7 @@
// notice may not be copied, modified, or distributed except
// according to those terms.
//! Topic
//! Topic OKM (Object Key/Col/Value Mapping)
use std::collections::HashMap;
use std::collections::HashSet;
@ -37,29 +37,33 @@ impl<'a> IModel for Topic<'a> {
fn class(&self) -> &Class {
&Self::CLASS
}
fn existential(&mut self) -> &mut dyn IExistentialValue {
&mut self.repo
fn existential(&mut self) -> Option<&mut dyn IExistentialValue> {
Some(&mut self.repo)
}
// fn name(&self) -> String {
// format_type_of(self)
// }
}
impl<'a> Topic<'a> {
const PREFIX: u8 = b't';
// Topic properties
const ADVERT: SingleValueColumn<Self, PublisherAdvert> = SingleValueColumn::new(b'a');
const REPO: ExistentialValueColumn = ExistentialValueColumn::new(b'r');
const ROOT_COMMIT: SingleValueColumn<Self, ObjectId> = SingleValueColumn::new(b'o');
pub const ADVERT: SingleValueColumn<Self, PublisherAdvert> = SingleValueColumn::new(b'a');
pub const REPO: ExistentialValueColumn = ExistentialValueColumn::new(b'r');
pub const ROOT_COMMIT: SingleValueColumn<Self, ObjectId> = SingleValueColumn::new(b'o');
// Topic <-> Users who pinned it (with boolean: R or W)
pub const USERS: MultiMapColumn<Self, UserId, bool> = MultiMapColumn::new(b'u');
// Topic <-> heads
pub const HEADS: MultiValueColumn<Self, ObjectId> = MultiValueColumn::new(b'h');
const CLASS: Class<'a> = Class::new(
Self::PREFIX,
&Self::REPO,
vec![&Self::ADVERT, &Self::ROOT_COMMIT],
vec![&Self::USERS, &Self::HEADS],
pub const CLASS: Class<'a> = Class::new(
"Topic",
Some(Self::PREFIX),
Some(&Self::REPO),
&[&Self::ADVERT as &dyn ISingleValueColumn, &Self::ROOT_COMMIT],
&[&Self::USERS as &dyn IMultiValueColumn, &Self::HEADS],
);
pub fn load(
@ -70,7 +74,8 @@ impl<'a> Topic<'a> {
let mut opening = Topic::new(id, overlay, storage);
let props = opening.load_props()?;
let existential = col(&Self::REPO, &props)?;
opening.repo.set(&existential, &opening)?;
opening.repo.set(&existential)?;
//ExistentialValue::save(&opening, &existential)?;
let ti = TopicInfo {
repo: existential,
publisher_advert: col(&Self::ADVERT, &props).ok(),
@ -111,12 +116,13 @@ impl<'a> Topic<'a> {
if topic.exists() {
return Err(StorageError::AlreadyExists);
}
topic.repo.set(repo, &topic)?;
topic.repo.set(repo)?;
ExistentialValue::save(&topic, repo)?;
Ok(topic)
}
pub fn repo_hash(&self) -> &RepoHash {
pub fn repo_hash(&mut self) -> &RepoHash {
self.repo.get().unwrap()
}

@ -12,6 +12,7 @@ use std::collections::HashMap;
use std::{collections::HashSet, marker::PhantomData};
use crate::errors::StorageError;
use crate::log::*;
use serde::{Deserialize, Serialize};
use serde_bare::{from_slice, to_vec};
@ -39,53 +40,104 @@ where
}
pub struct Class<'a> {
columns: Vec<&'a dyn ISingleValueColumn>,
multi_value_columns: Vec<&'a dyn IMultiValueColumn>,
existential_column: &'a dyn ISingleValueColumn,
prefix: u8,
prefix: Option<u8>,
pub name: &'static str,
existential_column: Option<&'a dyn ISingleValueColumn>,
columns: &'a [&'a dyn ISingleValueColumn],
multi_value_columns: &'a [&'a dyn IMultiValueColumn],
}
impl<'a> Class<'a> {
pub fn new(
prefix: u8,
existential_column: &'a dyn ISingleValueColumn,
columns: Vec<&'a dyn ISingleValueColumn>,
multi_value_columns: Vec<&'a dyn IMultiValueColumn>,
pub const fn new(
name: &'static str,
prefix: Option<u8>,
existential_column: Option<&'a dyn ISingleValueColumn>,
columns: &'a [&'a dyn ISingleValueColumn],
multi_value_columns: &'a [&'a dyn IMultiValueColumn],
) -> Self {
// check unicity of prefixes and suffixes
#[cfg(test)]
{
let mut prefixes = HashSet::from([prefix]);
let mut suffixes = HashSet::from([existential_column.suffix()]);
for column in columns.iter() {
if !suffixes.insert(column.suffix()) {
panic!("duplicate suffix {} !!! check the code", column.suffix());
}
if prefix.is_none() {
if existential_column.is_some() {
panic!("cannot have an existential_column without a prefix");
}
for mvc in multi_value_columns.iter() {
if !prefixes.insert(mvc.prefix()) {
panic!("duplicate prefix {} !!! check the code", mvc.prefix());
}
if columns.len() > 0 {
panic!("cannot have some property columns without a prefix");
}
}
Self {
columns,
name,
multi_value_columns,
prefix,
existential_column,
}
}
/// check unicity of prefixes and suffixes
#[cfg(debug_assertions)]
pub fn check(&self) {
let mut prefixes = if self.prefix.is_some() {
HashSet::from([self.prefix.unwrap()])
} else {
HashSet::new()
};
let mut suffixes = if self.existential_column.is_some() {
HashSet::from([self.existential_column.unwrap().suffix()])
} else {
HashSet::new()
};
let name = self.name;
//log_debug!("CHECKING CLASS {name}");
for column in self.columns.iter() {
//log_debug!("INSERTING SUFFIX {}", column.suffix());
if !suffixes.insert(column.suffix()) {
panic!(
"duplicate suffix {} in {name}!!! check the code",
column.suffix() as char
);
}
}
//log_debug!("SUFFIXES {:?}", suffixes);
for mvc in self.multi_value_columns.iter() {
//log_debug!("INSERTING PREFIX {}", mvc.prefix());
if !prefixes.insert(mvc.prefix()) {
panic!(
"duplicate prefix {} in {name}!!! check the code",
mvc.prefix() as char
);
}
}
//log_debug!("PREFIXES {:?}", prefixes);
}
pub fn prefixes(&self) -> Vec<u8> {
let mut res: Vec<u8> = self
.multi_value_columns
.iter()
.map(|c| c.prefix())
.collect();
if self.prefix.is_some() {
res.push(self.prefix.unwrap());
}
res
}
fn suffices(&self) -> Vec<u8> {
let mut res: Vec<u8> = self.columns.iter().map(|c| c.suffix()).collect();
res.push(self.existential_column.suffix());
if self.existential_column.is_some() {
res.push(self.existential_column.unwrap().suffix());
}
res
}
}
pub fn format_type_of<T>(_: &T) -> String {
format!("{}", std::any::type_name::<T>())
}
pub trait IModel {
fn key(&self) -> &Vec<u8>;
fn prefix(&self) -> u8 {
self.class().prefix
self.class().prefix.unwrap()
}
fn check_exists(&mut self) -> Result<(), StorageError> {
if !self.exists() {
@ -93,17 +145,20 @@ pub trait IModel {
}
Ok(())
}
fn existential(&mut self) -> &mut dyn IExistentialValue;
fn existential(&mut self) -> Option<&mut dyn IExistentialValue>;
fn exists(&mut self) -> bool {
if self.existential().exists() {
if self.existential().is_none() || self.class().existential_column.is_none() {
return true;
}
if self.existential().as_mut().unwrap().exists() {
return true;
}
let prefix = self.prefix();
let key = self.key();
let suffix = self.class().existential_column.suffix();
let suffix = self.class().existential_column.unwrap().suffix();
match self.storage().get(prefix, key, Some(suffix), &None) {
Ok(res) => {
self.existential().process_exists(res);
self.existential().as_mut().unwrap().process_exists(res);
true
}
Err(e) => false,
@ -111,6 +166,9 @@ pub trait IModel {
}
fn storage(&self) -> &dyn KCVStorage;
fn load_props(&self) -> Result<HashMap<u8, Vec<u8>>, StorageError> {
if self.class().prefix.is_none() {
panic!("cannot call load_props on a Class without prefix");
}
self.storage().get_all_properties_of_key(
self.prefix(),
self.key().to_vec(),
@ -121,10 +179,12 @@ pub trait IModel {
fn class(&self) -> &Class;
fn del(&self) -> Result<(), StorageError> {
self.storage().write_transaction(&mut |tx| {
tx.del_all(self.prefix(), self.key(), &self.class().suffices(), &None)?;
if self.class().prefix.is_some() {
tx.del_all(self.prefix(), self.key(), &self.class().suffices(), &None)?;
}
for mvc in self.class().multi_value_columns.iter() {
let size = mvc.value_size()?;
tx.del_all_values(self.prefix(), self.key(), size, None, &None)?;
tx.del_all_values(mvc.prefix(), self.key(), size, None, &None)?;
}
Ok(())
})?;
@ -139,7 +199,7 @@ pub struct MultiValueColumn<
prefix: u8,
phantom: PhantomData<Column>,
model: PhantomData<Model>,
value_size: usize,
//value_size: usize,
}
impl<
@ -147,14 +207,11 @@ impl<
Column: Eq + PartialEq + Hash + Serialize + Default + for<'d> Deserialize<'d>,
> MultiValueColumn<Model, Column>
{
pub fn new(prefix: u8) -> Self {
pub const fn new(prefix: u8) -> Self {
MultiValueColumn {
prefix,
phantom: PhantomData,
model: PhantomData,
value_size: to_vec(&Column::default())
.expect("serialization of default Column value")
.len(),
}
}
@ -191,7 +248,7 @@ impl<
let key_prefix = model.key();
let key_prefix_len = key_prefix.len();
let mut res: HashSet<Column> = HashSet::new();
let total_size = key_prefix_len + self.value_size;
let total_size = key_prefix_len + self.value_size()?;
for val in model.storage().get_all_keys_and_values(
self.prefix,
total_size,
@ -229,7 +286,7 @@ pub struct MultiMapColumn<
phantom_column: PhantomData<Column>,
phantom_model: PhantomData<Model>,
phantom_value: PhantomData<Value>,
value_size: usize,
//value_size: usize,
}
impl<
@ -238,15 +295,12 @@ impl<
Value: Serialize + for<'a> Deserialize<'a>,
> MultiMapColumn<Model, Column, Value>
{
pub fn new(prefix: u8) -> Self {
pub const fn new(prefix: u8) -> Self {
MultiMapColumn {
prefix,
phantom_column: PhantomData,
phantom_model: PhantomData,
phantom_value: PhantomData,
value_size: to_vec(&Column::default())
.expect("serialization of default Column value")
.len(),
}
}
pub fn add(
@ -312,7 +366,7 @@ impl<
let key_prefix = model.key();
let key_prefix_len = key_prefix.len();
let mut res: HashMap<Column, Value> = HashMap::new();
let total_size = key_prefix_len + self.value_size;
let total_size = key_prefix_len + self.value_size()?;
for val in model.storage().get_all_keys_and_values(
self.prefix,
total_size,
@ -363,7 +417,7 @@ impl ISingleValueColumn for ExistentialValueColumn {
}
impl ExistentialValueColumn {
pub fn new(suffix: u8) -> Self {
pub const fn new(suffix: u8) -> Self {
ExistentialValueColumn { suffix }
}
}
@ -383,7 +437,7 @@ impl<Model: IModel, Value: Serialize + for<'d> Deserialize<'d>> ISingleValueColu
}
impl<Model: IModel, Value: Serialize + for<'d> Deserialize<'d>> SingleValueColumn<Model, Value> {
pub fn new(suffix: u8) -> Self {
pub const fn new(suffix: u8) -> Self {
SingleValueColumn {
suffix,
phantom_value: PhantomData,
@ -423,13 +477,14 @@ impl<Model: IModel, Value: Serialize + for<'d> Deserialize<'d>> SingleValueColum
)
}
pub fn del(
&self,
model: &mut Model,
tx: &mut dyn WriteTransaction,
) -> Result<(), StorageError> {
tx.del(model.prefix(), model.key(), Some(self.suffix), &None)
}
// should call the Model::del() instead
// pub fn del(
// &self,
// model: &mut Model,
// tx: &mut dyn WriteTransaction,
// ) -> Result<(), StorageError> {
// tx.del(model.prefix(), model.key(), Some(self.suffix), &None)
// }
}
pub struct ExistentialValue<Column: Serialize + for<'d> Deserialize<'d>> {
@ -459,22 +514,23 @@ impl<Column: Clone + Serialize + for<'d> Deserialize<'d>> ExistentialValue<Colum
}
}
pub fn set<Model: IModel>(
&mut self,
value: &Column,
model: &Model,
) -> Result<(), StorageError> {
pub fn set(&mut self, value: &Column) -> Result<(), StorageError> {
if self.value.is_some() {
return Err(StorageError::AlreadyExists);
}
self.value = Some(value.clone());
Ok(())
}
pub fn save<Model: IModel>(model: &Model, value: &Column) -> Result<(), StorageError> {
model.storage().replace(
model.prefix(),
model.key(),
Some(model.class().existential_column.suffix()),
Some(model.class().existential_column.unwrap().suffix()),
&to_vec(value)?,
&None,
)?;
self.value = Some(value.clone());
Ok(())
}

@ -307,6 +307,9 @@ pub struct RocksDbKCVStorage {
db: TransactionDB,
/// path for the storage backend data
path: String,
#[cfg(debug_assertions)]
pub classes: Vec<(String, Vec<u8>)>,
}
fn compare<T: Ord>(a: &[T], b: &[T]) -> std::cmp::Ordering {
@ -708,6 +711,34 @@ impl RocksDbKCVStorage {
Ok(RocksDbKCVStorage {
db: db,
path: path.to_str().unwrap().to_string(),
#[cfg(debug_assertions)]
classes: Vec::new(),
})
}
#[cfg(debug_assertions)]
pub fn add_class(&mut self, class: &Class) {
class.check();
self.classes
.push((class.name.to_string(), class.prefixes()));
}
#[cfg(debug_assertions)]
pub fn check_prefixes(&self) {
use std::collections::HashSet;
//log_debug!("CHECKING PREFIXES");
let mut all_prefixes = HashSet::new();
for (class, prefixes) in self.classes.iter() {
//log_debug!("CHECKING CLASS {class}");
for prefix in prefixes {
//log_debug!("CHECKING PREFIX {prefix}");
if !all_prefixes.insert(prefix) {
panic!(
"duplicate prefix {} for class {class} !!! check the code",
*prefix as char
);
}
}
}
}
}

@ -7,7 +7,7 @@
// notice may not be copied, modified, or distributed except
// according to those terms.
//! Branch storage on disk
//! Branch storage OKM (Object Key/Col/Value Mapping)
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;

@ -7,7 +7,7 @@
// notice may not be copied, modified, or distributed except
// according to those terms.
//! Repo storage on disk
//! Repo storage OKM (Object Key/Col/Value Mapping)
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;

Loading…
Cancel
Save