replay events in webapp after reload and before sending outbox

Niko PLP 2 weeks ago
parent 75a18fce2f
commit 4440f4467e
  1. 1
      Cargo.lock
  2. 12
      nextgraph/src/local_broker.rs
  3. 2
      ng-net/src/lib.rs
  4. 2
      ng-net/src/types.rs
  5. 1
      ng-repo/Cargo.toml
  6. 26
      ng-repo/src/branch.rs
  7. 30
      ng-repo/src/commit.rs
  8. 63
      ng-repo/src/errors.rs
  9. 87
      ng-repo/src/event.rs
  10. 7
      ng-repo/src/object.rs
  11. 19
      ng-repo/src/repo.rs
  12. 85
      ng-repo/src/store.rs
  13. 98
      ng-repo/src/types.rs
  14. 515
      ng-verifier/src/commits/mod.rs
  15. 2
      ng-verifier/src/lib.rs
  16. 10
      ng-verifier/src/rocksdb_user_storage.rs
  17. 2
      ng-verifier/src/site.rs
  18. 6
      ng-verifier/src/user_storage/branch.rs
  19. 43
      ng-verifier/src/user_storage/repo.rs
  20. 18
      ng-verifier/src/user_storage/storage.rs
  21. 484
      ng-verifier/src/verifier.rs

1
Cargo.lock generated

@ -3344,6 +3344,7 @@ dependencies = [
"base64-url",
"blake3",
"chacha20",
"crypto_box",
"current_platform",
"curve25519-dalek 3.2.0",
"debug_print",

@ -732,11 +732,11 @@ impl LocalBroker {
"NextGraph user_master_key BLAKE3 key",
key_material.as_slice(),
);
log_info!(
"USER MASTER KEY {user_id} {} {:?}",
user_id.to_hash_string(),
key
);
// log_info!(
// "USER MASTER KEY {user_id} {} {:?}",
// user_id.to_hash_string(),
// key
// );
key_material.zeroize();
let mut verifier = Verifier::new(
VerifierConfig {
@ -1427,7 +1427,7 @@ pub async fn doc_fetch(
session.verifier.doc_fetch(nuri, payload)
}
/// retrieves the ID of the one of the 3 stores of a the personal Site (3P: public, protected, or private)
/// retrieves the ID of one of the 3 stores of a the personal Site (3P: public, protected, or private)
pub async fn personal_site_store(session_id: u8, store: SiteStoreType) -> Result<PubKey, NgError> {
let broker = match LOCAL_BROKER.get() {
None | Some(Err(_)) => return Err(NgError::LocalBrokerNotInitialized),

@ -8,8 +8,6 @@
* notice may not be copied, modified, or distributed except
* according to those terms.
*/
//#[macro_use]
//extern crate ng_repo;
pub mod types;

@ -4005,6 +4005,8 @@ pub struct ReadBranchLinkV0 {
pub branch: BranchId, // must match the one in read_cap
pub topic: TopicId,
/// an optional list of heads that can be fetched in this branch
/// useful if a specific head is to be shared
pub heads: Vec<ObjectRef>,

@ -44,6 +44,7 @@ once_cell = "1.17.1"
serde_json = "1.0"
os_info = "3"
current_platform = "0.2.0"
crypto_box = { version = "0.8.2", features = ["seal"] }
[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
debug_print = "1.0.0"

@ -50,13 +50,12 @@ impl Branch {
/// RepoWriteCapSecret, TopicId, BranchId )
/// so that only editors of the repo can decrypt the privkey
/// nonce = 0
pub fn encrypt_topic_priv_key(
privkey: &BranchWriteCapSecret,
fn encrypt_topic_priv_key(
mut plaintext: Vec<u8>,
topic_id: TopicId,
branch_id: BranchId,
repo_write_cap_secret: &RepoWriteCapSecret,
) -> Vec<u8> {
let mut plaintext = serde_bare::to_vec(privkey).unwrap();
let repo_write_cap_secret = serde_bare::to_vec(repo_write_cap_secret).unwrap();
let topic_id = serde_bare::to_vec(&topic_id).unwrap();
let branch_id = serde_bare::to_vec(&branch_id).unwrap();
@ -71,6 +70,27 @@ impl Branch {
plaintext
}
pub fn encrypt_branch_write_cap_secret(
privkey: &BranchWriteCapSecret,
topic_id: TopicId,
branch_id: BranchId,
repo_write_cap_secret: &RepoWriteCapSecret,
) -> Vec<u8> {
let plaintext = serde_bare::to_vec(privkey).unwrap();
Branch::encrypt_topic_priv_key(plaintext, topic_id, branch_id, repo_write_cap_secret)
}
pub fn decrypt_branch_write_cap_secret(
ciphertext: Vec<u8>,
topic_id: TopicId,
branch_id: BranchId,
repo_write_cap_secret: &RepoWriteCapSecret,
) -> Result<BranchWriteCapSecret, NgError> {
let plaintext =
Branch::encrypt_topic_priv_key(ciphertext, topic_id, branch_id, repo_write_cap_secret);
Ok(serde_bare::from_slice(&plaintext)?)
}
pub fn new(
id: PubKey,
repo: ObjectRef,

@ -424,6 +424,11 @@ impl Commit {
}
}
/// Get quorum_type
pub fn quorum_type(&self) -> &QuorumType {
&self.content_v0().quorum
}
/// Get commit content
pub fn content(&self) -> &CommitContent {
match self {
@ -469,7 +474,7 @@ impl Commit {
}
Err(CommitLoadError::HeaderLoadError)
}
CommitBody::V0(CommitBodyV0::Delete) => Ok(true),
CommitBody::V0(CommitBodyV0::Delete(_)) => Ok(true),
_ => Ok(false),
}
}
@ -761,6 +766,15 @@ impl CommitBody {
Ok((obj.reference().unwrap(), blocks))
}
pub fn is_add_signer_cap(&self) -> bool {
match self {
Self::V0(v0) => match v0 {
CommitBodyV0::AddSignerCap(_) => true,
_ => false,
},
}
}
pub fn root_branch_commit(&self) -> Result<&RootBranch, CommitLoadError> {
match self {
Self::V0(v0) => match v0 {
@ -806,7 +820,7 @@ impl CommitBody {
CommitBodyV0::RefreshReadCap(_) => true,
CommitBodyV0::RefreshWriteCap(_) => true,
CommitBodyV0::SyncSignature(_) => true,
CommitBodyV0::Delete => true,
CommitBodyV0::Delete(_) => true,
_ => false,
},
}
@ -970,7 +984,7 @@ impl CommitBody {
],
CommitBodyV0::RefreshReadCap(_) => vec![PermissionV0::RefreshReadCap],
CommitBodyV0::RefreshWriteCap(_) => vec![PermissionV0::RefreshWriteCap],
CommitBodyV0::Delete => vec![],
CommitBodyV0::Delete(_) => vec![],
CommitBodyV0::AddRepo(_)
| CommitBodyV0::RemoveRepo(_)
| CommitBodyV0::AddLink(_)
@ -1367,6 +1381,16 @@ impl fmt::Display for CommitBody {
// write!(f, "RefreshWriteCap {}", b)
// }
CommitBodyV0::SyncSignature(b) => write!(f, "SyncSignature {}", b),
//CommitBodyV0::AddRepo(b) => write!(f, "AddRepo {}", b),
//CommitBodyV0::RemoveRepo(b) => write!(f, "RemoveRepo {}", b),
CommitBodyV0::AddSignerCap(b) => write!(f, "AddSignerCap {}", b),
CommitBodyV0::StoreUpdate(b) => write!(f, "StoreUpdate {}", b),
/* AddLink(AddLink),
RemoveLink(RemoveLink),
AddSignerCap(AddSignerCap),
RemoveSignerCap(RemoveSignerCap),
WalletUpdate(WalletUpdate),
StoreUpdate(StoreUpdate), */
_ => unimplemented!(),
}
}

@ -9,7 +9,7 @@
//! Errors
use crate::commit::{CommitLoadError, CommitVerifyError};
pub use crate::commit::{CommitLoadError, CommitVerifyError};
use num_enum::IntoPrimitive;
use num_enum::TryFromPrimitive;
@ -24,6 +24,7 @@ pub enum NgError {
IncompleteSignature,
SerializationError,
EncryptionError,
DecryptionError,
InvalidValue,
ConnectionNotFound,
InvalidKey,
@ -60,6 +61,7 @@ pub enum NgError {
ServerError(ServerError),
InvalidResponse,
NotAServerError,
VerifierError(VerifierError),
}
impl Error for NgError {}
@ -145,8 +147,14 @@ impl From<StorageError> for NgError {
}
}
impl From<VerifierError> for NgError {
fn from(e: VerifierError) -> Self {
NgError::VerifierError(e)
}
}
/// Object parsing errors
#[derive(Debug)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum ObjectParseError {
/// Missing blocks
MissingBlocks(Vec<BlockId>),
@ -214,6 +222,57 @@ impl ServerError {
}
}
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum VerifierError {
MalformedDag,
MissingCommitInDag,
CommitBodyNotFound,
InvalidKey,
OtherError(String),
CommitLoadError(CommitLoadError),
InvalidRepositoryCommit,
MissingRepoWriteCapSecret,
StorageError(StorageError),
ObjectParseError(ObjectParseError),
NotImplemented,
InvalidSignatureObject,
MalformedSyncSignatureAcks,
MalformedSyncSignatureDeps,
TopicNotFound,
RepoNotFound,
InvalidBranch,
NoBlockStorageAvailable,
RootBranchNotFound,
}
impl From<NgError> for VerifierError {
fn from(e: NgError) -> Self {
match e {
NgError::InvalidKey => VerifierError::InvalidKey,
NgError::RepoNotFound => VerifierError::RepoNotFound,
_ => VerifierError::OtherError(e.to_string()),
}
}
}
impl From<CommitLoadError> for VerifierError {
fn from(e: CommitLoadError) -> Self {
VerifierError::CommitLoadError(e)
}
}
impl From<ObjectParseError> for VerifierError {
fn from(e: ObjectParseError) -> Self {
VerifierError::ObjectParseError(e)
}
}
impl From<StorageError> for VerifierError {
fn from(e: StorageError) -> Self {
VerifierError::StorageError(e)
}
}
#[derive(Debug, Eq, PartialEq, TryFromPrimitive, IntoPrimitive, Clone)]
#[repr(u16)]
pub enum NetError {

@ -13,12 +13,14 @@ use zeroize::Zeroize;
use crate::block_storage::*;
use crate::errors::*;
use crate::object::*;
use crate::repo::BranchInfo;
use crate::repo::Repo;
use crate::store::Store;
use crate::types::*;
use crate::utils::*;
use core::fmt;
use std::sync::Arc;
use std::sync::RwLockWriteGuard;
use chacha20::cipher::{KeyIvInit, StreamCipher};
use chacha20::ChaCha20;
@ -88,6 +90,38 @@ impl Event {
Event::V0(v0) => &v0.content.topic,
}
}
/// opens an event with the key derived from information kept in Repo.
///
/// returns the Commit object and optional list of additional block IDs.
/// Those blocks have been added to the storage of store of repo so they can be retrieved.
pub fn open_with_info(&self, repo: &Repo, branch: &BranchInfo) -> Result<Commit, NgError> {
match self {
Self::V0(v0) => v0.open_with_info(repo, branch),
}
}
pub fn open(
&self,
store: &Store,
repo_id: &RepoId,
branch_id: &BranchId,
branch_secret: &ReadCapSecret,
) -> Result<Commit, NgError> {
match self {
Self::V0(v0) => v0.open(store, repo_id, branch_id, branch_secret),
}
}
// pub fn put_blocks<'a>(
// &self,
// overlay: &OverlayId,
// storage: &RwLockWriteGuard<'a, dyn BlockStorage + Send + Sync + 'static>,
// ) -> Result<ObjectId, NgError> {
// match self {
// Self::V0(v0) => v0.put_blocks(overlay, storage),
// }
// }
}
impl EventV0 {
@ -166,4 +200,57 @@ impl EventV0 {
peer_sig,
})
}
// pub fn put_blocks<'a>(
// &self,
// overlay: &OverlayId,
// storage: &RwLockWriteGuard<'a, dyn BlockStorage + Send + Sync + 'static>,
// ) -> Result<ObjectId, NgError> {
// let mut first_id = None;
// for block in &self.content.blocks {
// let id = storage.put(overlay, block)?;
// if first_id.is_none() {
// first_id = Some(id)
// }
// }
// first_id.ok_or(NgError::CommitLoadError(CommitLoadError::NotACommitError))
// }
/// opens an event with the key derived from information kept in Repo.
///
/// returns the Commit object and optional list of additional block IDs.
/// Those blocks have been added to the storage of store of repo so they can be retrieved.
pub fn open_with_info(&self, repo: &Repo, branch: &BranchInfo) -> Result<Commit, NgError> {
self.open(&repo.store, &repo.id, &branch.id, &branch.read_cap.key)
}
pub fn open(
&self,
store: &Store,
repo_id: &RepoId,
branch_id: &BranchId,
branch_secret: &ReadCapSecret,
) -> Result<Commit, NgError> {
// TODO: verifier event signature
let publisher_pubkey = self.content.publisher.get_pub_key();
let key = Self::derive_key(repo_id, branch_id, branch_secret, &publisher_pubkey);
let mut encrypted_commit_key = self.content.key.clone();
let mut nonce = self.content.seq.to_le_bytes().to_vec();
nonce.append(&mut vec![0; 4]);
let mut cipher = ChaCha20::new((&key).into(), (nonce.as_slice()).into());
cipher.apply_keystream(encrypted_commit_key.as_mut_slice());
let commit_key: SymKey = encrypted_commit_key.as_slice().try_into()?;
let mut first_id = None;
for block in &self.content.blocks {
let id = store.put(block)?;
if first_id.is_none() {
first_id = Some(id)
}
}
let commit_ref = ObjectRef::from_id_key(first_id.unwrap(), commit_key);
Ok(Commit::load(commit_ref, &store, true)?)
}
}

@ -420,6 +420,13 @@ impl Object {
}
}
/// Load an Object from BlockStorage (taking a reference)
///
/// Returns Ok(Object) or an Err(ObjectParseError::MissingBlocks(Vec<ObjectId>)) of missing BlockIds
pub fn load_ref(reference: &ObjectRef, store: &Store) -> Result<Object, ObjectParseError> {
Self::load(reference.id.clone(), Some(reference.key.clone()), store)
}
/// Load an Object from BlockStorage
///
/// Returns Ok(Object) or an Err(ObjectParseError::MissingBlocks(Vec<ObjectId>)) of missing BlockIds

@ -39,6 +39,11 @@ impl Repository {
pub fn new(id: &PubKey, metadata: &Vec<u8>) -> Repository {
Repository::V0(RepositoryV0::new(id, metadata))
}
pub fn id(&self) -> &PubKey {
match self {
Self::V0(v0) => &v0.id,
}
}
}
#[derive(Debug)]
@ -206,6 +211,11 @@ impl Repo {
self.branches.get(id).ok_or(NgError::BranchNotFound)
}
pub fn branch_mut(&mut self, id: &BranchId) -> Result<&mut BranchInfo, NgError> {
//TODO: load the BranchInfo from storage
self.branches.get_mut(id).ok_or(NgError::BranchNotFound)
}
pub fn overlay_branch(&self) -> Option<&BranchInfo> {
for (_, branch) in self.branches.iter() {
if branch.branch_type == BranchType::Overlay {
@ -224,6 +234,15 @@ impl Repo {
None
}
pub fn root_branch(&self) -> Option<&BranchInfo> {
for (_, branch) in self.branches.iter() {
if branch.branch_type == BranchType::Root {
return Some(branch);
}
}
None
}
pub fn overlay_branch_read_cap(&self) -> Option<&ReadCap> {
match self.overlay_branch() {
Some(bi) => Some(&bi.read_cap),

@ -48,18 +48,31 @@ impl From<&Store> for StoreUpdate {
impl fmt::Debug for Store {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Store.\nstore_repo {:?}", self.store_repo)?;
writeln!(f, "store_readcap {:?}", self.store_readcap)?;
write!(f, "Store:\nstore_repo {}", self.store_repo)?;
writeln!(f, "store_readcap {}", self.store_readcap)?;
writeln!(
f,
"store_overlay_branch_readcap {:?}",
"store_overlay_branch_readcap {}",
self.store_overlay_branch_readcap
)?;
writeln!(f, "overlay_id {:?}", self.overlay_id)
writeln!(f, "overlay_id {}", self.overlay_id)
}
}
impl Store {
pub fn new_from(
update: &StoreUpdate,
storage: Arc<RwLock<dyn BlockStorage + Send + Sync>>,
) -> Store {
match update {
StoreUpdate::V0(v0) => Store::new(
v0.store,
v0.store_read_cap.clone(),
v0.overlay_branch_read_cap.clone(),
storage,
),
}
}
pub fn id(&self) -> &PubKey {
self.store_repo.repo_id()
}
@ -70,6 +83,10 @@ impl Store {
}
}
pub fn is_private(&self) -> bool {
self.store_repo.is_private()
}
pub fn get_store_repo(&self) -> &StoreRepo {
&self.store_repo
}
@ -82,6 +99,10 @@ impl Store {
&self.store_overlay_branch_readcap.key
}
pub fn get_store_overlay_branch_readcap(&self) -> &ReadCap {
&self.store_overlay_branch_readcap
}
pub fn get_store_readcap_secret(&self) -> &ReadCapSecret {
&self.store_readcap.key
}
@ -146,7 +167,7 @@ impl Store {
repo: repository_commit_ref,
root_branch_readcap_id,
topic: branch_topic_pub_key,
topic_privkey: Branch::encrypt_topic_priv_key(
topic_privkey: Branch::encrypt_branch_write_cap_secret(
&branch_topic_priv_key,
branch_topic_pub_key,
branch_pub_key,
@ -159,7 +180,7 @@ impl Store {
&branch_priv_key,
&branch_pub_key,
branch_pub_key,
QuorumType::NoSigning,
QuorumType::Owners,
vec![],
vec![],
branch_commit_body,
@ -176,6 +197,7 @@ impl Store {
CommitBody::V0(CommitBodyV0::AddBranch(AddBranch::V0(AddBranchV0 {
branch_type: branch_type.clone(),
topic_id: branch_topic_pub_key,
branch_id: branch_pub_key,
branch_read_cap: branch_read_cap.clone(),
})));
@ -239,7 +261,8 @@ impl Store {
is_store: bool,
is_private_store: bool,
) -> Result<(Repo, Vec<(Commit, Vec<Digest>)>), NgError> {
let mut events = Vec::with_capacity(6);
let mut events = Vec::with_capacity(9);
let mut events_postponed = Vec::with_capacity(6);
// creating the Repository commit
@ -278,7 +301,7 @@ impl Store {
store: (&self.store_repo).into(),
store_sig: None, //TODO: the store signature
topic: topic_pub_key,
topic_privkey: Branch::encrypt_topic_priv_key(
topic_privkey: Branch::encrypt_branch_write_cap_secret(
&topic_priv_key,
topic_pub_key,
repo_pub_key,
@ -288,7 +311,10 @@ impl Store {
quorum: None,
reconciliation_interval: RelTime::None,
owners: vec![creator.clone()],
//TODO: add crypto_box of repo_write_cap_secret for creator
owners_write_cap: vec![serde_bytes::ByteBuf::from(RootBranch::encrypt_write_cap(
creator,
&repo_write_cap_secret,
)?)],
metadata: vec![],
})));
@ -296,7 +322,7 @@ impl Store {
&repo_priv_key,
&repo_pub_key,
repo_pub_key,
QuorumType::NoSigning,
QuorumType::Owners,
vec![],
vec![repository_commit_ref.clone()],
root_branch_commit_body,
@ -327,7 +353,7 @@ impl Store {
vec![],
)?;
events.push((main_branch_commit, vec![]));
events_postponed.push((main_branch_commit, vec![]));
// TODO: optional AddMember and AddPermission, that should be added as deps to the SynSignature below (and to the commits of the SignatureContent)
// using the creator as author (and incrementing their peer's seq_num)
@ -347,7 +373,7 @@ impl Store {
vec![],
)?;
events.push((store_branch_commit, vec![]));
events_postponed.push((store_branch_commit, vec![]));
// creating the overlay or user branch
let (
@ -370,7 +396,7 @@ impl Store {
vec![],
)?;
events.push((overlay_or_user_branch_commit, vec![]));
events_postponed.push((overlay_or_user_branch_commit, vec![]));
Some((
store_add_branch_commit,
@ -398,20 +424,16 @@ impl Store {
// creating signature for RootBranch, AddBranch and Branch commits
// signed with owner threshold signature (threshold = 0)
let mut signed_commits = vec![
root_branch_readcap_id,
main_add_branch_commit.id().unwrap(),
main_branch_info.read_cap.id,
];
let mut signed_commits = vec![main_branch_info.read_cap.id];
if let Some((store_add_branch, store_branch, oou_add_branch, oou_branch)) = &extra_branches
{
if let Some((_, store_branch, oou_add_branch, oou_branch)) = &extra_branches {
signed_commits.append(&mut vec![
store_add_branch.id().unwrap(),
store_branch.read_cap.id,
oou_add_branch.id().unwrap(),
store_branch.read_cap.id,
oou_branch.read_cap.id,
]);
} else {
signed_commits.push(main_add_branch_commit.id().unwrap());
}
let signature_content = SignatureContent::V0(SignatureContentV0 {
@ -481,9 +503,11 @@ impl Store {
let sync_signature = SyncSignature::V0(sig_object.reference().unwrap());
// creating the SyncSignature for the root_branch with deps to the AddBranch and acks to the RootBranch commit as it is its direct causal future.
// creating the SyncSignature commit body (cloned for each branch)
let sync_sig_commit_body = CommitBody::V0(CommitBodyV0::SyncSignature(sync_signature));
// creating the SyncSignature commit for the root_branch with deps to the AddBranch and acks to the RootBranch commit as it is its direct causal future.
let sync_sig_on_root_branch_commit = Commit::new_with_body_acks_deps_and_save(
creator_priv_key,
creator,
@ -514,12 +538,7 @@ impl Store {
branches.push((oou_branch_info.id, oou_branch_info));
}
let sync_sig_on_root_branch_commit_ref =
sync_sig_on_root_branch_commit.reference().unwrap();
events.push((sync_sig_on_root_branch_commit, additional_blocks));
// creating the SyncSignature for the all branches with deps to the Branch commit and acks also to this commit as it is its direct causal future.
// creating the SyncSignature for all 3 branches with deps to the Branch commit and acks also to this commit as it is its direct causal future.
for (branch_id, branch_info) in &mut branches {
let sync_sig_on_branch_commit = Commit::new_with_body_acks_deps_and_save(
@ -542,13 +561,19 @@ impl Store {
additional_blocks.extend(cert_obj_blocks.iter());
additional_blocks.extend(sig_obj_blocks.iter());
events.push((sync_sig_on_branch_commit, additional_blocks));
events_postponed.push((sync_sig_on_branch_commit, additional_blocks));
branch_info.current_heads = vec![sync_sig_on_branch_commit_ref];
// TODO: add the CertificateRefresh event on main branch
}
let sync_sig_on_root_branch_commit_ref =
sync_sig_on_root_branch_commit.reference().unwrap();
events.push((sync_sig_on_root_branch_commit, additional_blocks));
events.extend(events_postponed);
// preparing the Repo
let root_branch = BranchInfo {

@ -585,7 +585,7 @@ impl fmt::Display for StoreOverlay {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V0(v0) => {
writeln!(f, "StoreOverlay V0")?;
write!(f, "StoreOverlay V0 ")?;
match v0 {
StoreOverlayV0::PublicStore(k) => writeln!(f, "PublicStore: {}", k),
StoreOverlayV0::ProtectedStore(k) => writeln!(f, "ProtectedStore: {}", k),
@ -669,7 +669,20 @@ pub enum StoreRepo {
impl fmt::Display for StoreRepo {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "StoreRepo V0 {}", self.repo_id())
writeln!(
f,
"StoreRepo V0 {} {}",
match self {
StoreRepo::V0(v0) => match v0 {
StoreRepoV0::PublicStore(_) => "PublicStore",
StoreRepoV0::ProtectedStore(_) => "ProtectedStore",
StoreRepoV0::PrivateStore(_) => "PrivateStore",
StoreRepoV0::Group(_) => "Group",
StoreRepoV0::Dialog(_) => "Dialog",
},
},
self.repo_id()
)
}
}
@ -696,6 +709,10 @@ impl StoreRepo {
StoreRepo::V0(StoreRepoV0::PublicStore(repo_pubkey))
}
pub fn new_private(repo_pubkey: PubKey) -> Self {
StoreRepo::V0(StoreRepoV0::PrivateStore(repo_pubkey))
}
pub fn outer_overlay(&self) -> OverlayId {
self.overlay_id_for_read_purpose()
}
@ -1095,9 +1112,12 @@ pub struct RootBranchV0 {
// list of owners. all of them are required to sign any RootBranch that modifies the list of owners or the inherit_perms_users_and_quorum_from_store field.
pub owners: Vec<UserId>,
/// Mutable App-specific metadata
/// when the list of owners is changed, a crypto_box containing the RepoWriteCapSecret should be included here for each owner.
/// this should also be done at creation time, with the UserId of the first owner, except for individual private store repo, because it doesnt have a RepoWriteCapSecret
/// the vector has the same order and size as the owners one. each owner finds their write_cap here.
pub owners_write_cap: Vec<serde_bytes::ByteBuf>,
/// Mutable App-specific metadata
#[serde(with = "serde_bytes")]
pub metadata: Vec<u8>,
}
@ -1114,6 +1134,26 @@ impl RootBranch {
Self::V0(v0) => &v0.owners,
}
}
pub fn encrypt_write_cap(
for_user: &UserId,
write_cap: &RepoWriteCapSecret,
) -> Result<Vec<u8>, NgError> {
let ser = serde_bare::to_vec(write_cap).unwrap();
let mut rng = crypto_box::aead::OsRng {};
let cipher = crypto_box::seal(&mut rng, &for_user.to_dh_slice().into(), &ser)
.map_err(|_| NgError::EncryptionError)?;
Ok(cipher)
}
pub fn decrypt_write_cap(
by_user: &PrivKey,
cipher: &Vec<u8>,
) -> Result<RepoWriteCapSecret, NgError> {
let ser = crypto_box::seal_open(&(*by_user.to_dh().slice()).into(), cipher)
.map_err(|_| NgError::DecryptionError)?;
let write_cap: RepoWriteCapSecret =
serde_bare::from_slice(&ser).map_err(|_| NgError::SerializationError)?;
Ok(write_cap)
}
}
impl fmt::Display for RootBranch {
@ -1131,7 +1171,7 @@ impl fmt::Display for RootBranch {
.as_ref()
.map_or("None".to_string(), |c| format!("{}", c))
)?;
writeln!(f, "topic: {}", v0.repo)?;
writeln!(f, "topic: {}", v0.topic)?;
writeln!(
f,
"inherit_perms: {}",
@ -1229,7 +1269,7 @@ pub struct BranchV0 {
/// BLAKE3 derive_key ("NextGraph Branch WriteCap Secret BLAKE3 key",
/// RepoWriteCapSecret, TopicId, BranchId )
/// so that only editors of the repo can decrypt the privkey
/// not encrypted for individual store repo.
/// For individual store repo, the RepoWriteCapSecret is zero
#[serde(with = "serde_bytes")]
pub topic_privkey: Vec<u8>,
@ -1269,6 +1309,7 @@ pub enum BranchType {
User,
Transactional, // this could have been called OtherTransactional, but for the sake of simplicity, we use Transactional for any branch that is not the Main one.
Root, // only used for BranchInfo
//Unknown, // only used temporarily when loading a branch info from commits (Branch commit, then AddBranch commit)
}
impl fmt::Display for BranchType {
@ -1284,6 +1325,7 @@ impl fmt::Display for BranchType {
Self::User => "User",
Self::Transactional => "Transactional",
Self::Root => "Root",
//Self::Unknown => "==unknown==",
}
)
}
@ -1298,6 +1340,8 @@ pub struct AddBranchV0 {
/// in order to subscribe to the pub/sub). should be identical to the one in the Branch definition
pub topic_id: TopicId,
pub branch_id: BranchId,
pub branch_type: BranchType,
// the new branch definition commit
@ -1644,6 +1688,19 @@ pub enum AddSignerCap {
V0(AddSignerCapV0),
}
impl fmt::Display for AddSignerCap {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V0(v0) => {
writeln!(f, "V0")?;
writeln!(f, "cap: {:?}", v0.cap)?;
Ok(())
}
}
}
}
/// Removes a SignerCap from the `user` branch.
///
/// DEPS to the previous AddSignerCap commit(s) (ORset logic) with matching repo_id
@ -1702,6 +1759,24 @@ pub enum StoreUpdate {
V0(StoreUpdateV0),
}
impl fmt::Display for StoreUpdate {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::V0(v0) => {
writeln!(f, "V0")?;
writeln!(f, "store: {}", v0.store)?;
writeln!(f, "store_read_cap: {}", v0.store_read_cap)?;
write!(
f,
"overlay_branch_read_cap: {}",
v0.overlay_branch_read_cap
)?;
Ok(())
}
}
}
}
//
// Commits on transaction branches
//
@ -1835,7 +1910,7 @@ pub enum SyncSignature {
}
impl SyncSignature {
pub fn verify(&self) -> bool {
pub fn verify_quorum(&self) -> bool {
// check that the signature object referenced here, is of type threshold_sig Total or Owner
unimplemented!();
}
@ -2074,7 +2149,7 @@ pub enum CommitBodyV0 {
RemoveBranch(RemoveBranch),
AddName(AddName),
RemoveName(RemoveName),
Delete, // signed with owners key. Deleted the repo
Delete(()), // signed with owners key. Deletes the repo
// TODO? Quorum(Quorum), // changes the quorum without changing the RootBranch
@ -2396,6 +2471,15 @@ impl fmt::Display for PeerId {
}
}
impl PeerId {
pub fn get_pub_key(&self) -> PubKey {
match self {
Self::Direct(pk) | Self::Forwarded(pk) => pk.clone(),
_ => panic!("cannot get a pubkey for ForwardedObfuscated"),
}
}
}
/// Content of EventV0
///
/// Contains the objects of newly published Commit, its optional blocks, and optional FILES and their blocks.

@ -0,0 +1,515 @@
// Copyright (c) 2022-2024 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
use crate::verifier::Verifier;
use ng_repo::errors::VerifierError;
use ng_repo::log::*;
use ng_repo::object::Object;
use ng_repo::repo::{BranchInfo, Repo};
use ng_repo::store::Store;
use ng_repo::types::*;
use std::collections::HashMap;
use std::sync::Arc;
pub trait CommitVerifier {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError>;
}
fn list_dep_chain_until(
start: ObjectRef,
end: &ObjectId,
store: &Store,
) -> Result<Vec<Commit>, VerifierError> {
let mut res = vec![];
let mut pos = start;
loop {
let pos_id = pos.id.clone();
if pos_id == *end {
break;
}
let commit = Commit::load(pos, &store, true)?;
let deps = commit.deps();
if deps.len() != 1 {
return Err(VerifierError::MalformedSyncSignatureDeps);
}
res.push(commit);
pos = deps[0].clone();
}
res.reverse();
Ok(res)
}
impl CommitVerifier for RootBranch {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
match self {
RootBranch::V0(root_branch) => {
let repository_commit = Commit::load(root_branch.repo.clone(), &store, true)?;
let repository = match repository_commit
.body()
.ok_or(VerifierError::CommitBodyNotFound)?
{
CommitBody::V0(CommitBodyV0::Repository(r)) => r,
_ => return Err(VerifierError::InvalidRepositoryCommit),
};
//TODO: deal with quorum_type (verify signature)
let user_priv = verifier.user_privkey();
let user_id = user_priv.to_pub();
let repo_write_cap_secret = if store.is_private() {
Some(SymKey::nil())
} else if let Some(pos) = root_branch.owners.iter().position(|&o| o == user_id) {
let cryptobox = &root_branch.owners_write_cap[pos];
Some(RootBranch::decrypt_write_cap(user_priv, cryptobox)?)
} else {
None
};
let topic_priv_key = if let Some(rwcs) = repo_write_cap_secret.as_ref() {
Branch::decrypt_branch_write_cap_secret(
root_branch.topic_privkey.clone(),
root_branch.topic.clone(),
root_branch.id.clone(),
rwcs,
)
.map_or(None, |k| Some(k))
} else {
None
};
let reference = commit.reference().unwrap();
let root_branch = BranchInfo {
id: root_branch.id.clone(),
branch_type: BranchType::Root,
topic: root_branch.topic,
topic_priv_key,
read_cap: reference.clone(),
current_heads: vec![reference.clone()],
};
let id = root_branch.id;
let branches = vec![(root_branch.id, root_branch)];
let repo = Repo {
id,
repo_def: repository.clone(),
signer: None, //TO BE ADDED LATER when AddSignerCap commit is found
members: HashMap::new(),
store: Arc::clone(&store),
read_cap: Some(reference),
write_cap: repo_write_cap_secret,
branches: branches.into_iter().collect(),
opened_branches: HashMap::new(),
};
let _repo_ref = verifier.add_repo_and_save(repo);
}
}
Ok(())
}
}
impl CommitVerifier for Branch {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
match self {
Branch::V0(branch) => {
//TODO: deal with root_branch_readcap_id (the epoch)
//TODO: deal with quorum_type (verify signature)
let repository_commit = Commit::load(branch.repo.clone(), &store, true)?;
let repository = match repository_commit
.body()
.ok_or(VerifierError::CommitBodyNotFound)?
{
CommitBody::V0(CommitBodyV0::Repository(r)) => r,
_ => return Err(VerifierError::InvalidRepositoryCommit),
};
// check that the repository exists
let repo = verifier.get_repo_mut(repository.id(), store.get_store_repo())?;
let topic_priv_key = if let Some(rwcs) = repo.write_cap.as_ref() {
Branch::decrypt_branch_write_cap_secret(
branch.topic_privkey.clone(),
branch.topic.clone(),
branch.id.clone(),
rwcs,
)
.map_or(None, |k| Some(k))
} else {
None
};
let reference = commit.reference().unwrap();
let branch_info = repo.branch_mut(&branch.id)?;
if branch_info.read_cap != reference {
return Err(VerifierError::InvalidBranch);
}
branch_info.topic_priv_key = topic_priv_key;
branch_info.current_heads = vec![reference];
verifier.update_branch(&repository.id(), &branch.id, store.get_store_repo())?;
Ok(())
}
}
}
}
impl CommitVerifier for SyncSignature {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
match self {
SyncSignature::V0(signature_ref) => {
let sign = Object::load_ref(signature_ref, &store)?;
match sign.content_v0()? {
ObjectContentV0::Signature(sig) => {
//TODO: verify signature
}
_ => return Err(VerifierError::InvalidSignatureObject),
}
// process each deps
let acks = commit.acks();
if acks.len() != 1 {
return Err(VerifierError::MalformedSyncSignatureAcks);
}
let ack = &acks[0];
let deps = commit.deps();
if deps.len() != 1 {
return Err(VerifierError::MalformedSyncSignatureDeps);
}
let commits = list_dep_chain_until(deps[0].clone(), &ack.id, &store)?;
for commit in commits {
verifier.verify_commit(commit, Arc::clone(&store))?;
}
}
}
Ok(())
}
}
impl CommitVerifier for AddBranch {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
match self {
AddBranch::V0(v0) => {
if v0.branch_type == BranchType::Root {
return Err(VerifierError::InvalidBranch);
}
// let _ = verifier.topics.insert(
// (store.inner_overlay(), v0.topic_id),
// (*commit.branch(), v0.branch_id),
// );
let branch_info = BranchInfo {
id: v0.branch_id,
branch_type: v0.branch_type.clone(),
topic: v0.topic_id,
topic_priv_key: None,
read_cap: v0.branch_read_cap.clone(),
current_heads: vec![],
};
verifier.add_branch_and_save(
commit.branch(),
branch_info,
store.get_store_repo(),
)?;
}
}
Ok(())
}
}
impl CommitVerifier for Repository {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
// left empty intentionally
Ok(())
}
}
impl CommitVerifier for StoreUpdate {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
verifier.new_store_from_update(self)
}
}
impl CommitVerifier for AddSignerCap {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
match self {
AddSignerCap::V0(v0) => verifier.update_signer_cap(&v0.cap),
}
}
}
impl CommitVerifier for AddMember {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for RemoveMember {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for AddPermission {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for RemovePermission {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for RemoveBranch {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for AddName {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for RemoveName {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for () {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for Snapshot {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for AddFile {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for RemoveFile {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for Compact {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for AsyncSignature {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for RefreshReadCap {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for RefreshWriteCap {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for AddRepo {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for RemoveRepo {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for AddLink {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for RemoveLink {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for RemoveSignerCap {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}
impl CommitVerifier for WalletUpdate {
fn verify(
&self,
commit: &Commit,
verifier: &mut Verifier,
store: Arc<Store>,
) -> Result<(), VerifierError> {
Ok(())
}
}

@ -6,5 +6,7 @@ pub mod verifier;
pub mod site;
pub mod commits;
#[cfg(not(target_family = "wasm"))]
pub mod rocksdb_user_storage;

@ -14,7 +14,7 @@ use crate::user_storage::repo::RepoStorage;
use crate::user_storage::*;
use either::Either::{Left, Right};
use ng_repo::block_storage::BlockStorage;
use ng_repo::repo::Repo;
use ng_repo::repo::{BranchInfo, Repo};
use ng_repo::store::Store;
use ng_repo::{errors::StorageError, types::*};
use ng_storage_rocksdb::kcv_storage::RocksdbKCVStorage;
@ -67,4 +67,12 @@ impl UserStorage for RocksDbUserStorage {
RepoStorage::create_from_repo(repo, &self.user_storage)?;
Ok(())
}
fn add_branch(&self, repo_id: &RepoId, branch_info: &BranchInfo) -> Result<(), StorageError> {
RepoStorage::add_branch_from_info(repo_id, branch_info, &self.user_storage)
}
fn update_signer_cap(&self, signer_cap: &SignerCap) -> Result<(), StorageError> {
RepoStorage::update_signer_cap(signer_cap, &self.user_storage)
}
}

@ -208,7 +208,7 @@ impl SiteV0 {
// update the current_heads
verifier.update_current_heads(&private_repo_id, &user_branch_id, vec![current_head])?;
// sending the 5 events
// sending the events
verifier
.new_events(commits, private_repo_id, &private_store_repo)
.await?;

@ -100,9 +100,9 @@ impl<'a> BranchStorage<'a> {
id: id.clone(),
storage,
};
if bs.exists() {
return Err(StorageError::AlreadyExists);
}
// if bs.exists() {
// return Err(StorageError::AlreadyExists);
// }
storage.write_transaction(&mut |tx| {
let id_ser = to_vec(&id)?;

@ -121,6 +121,49 @@ impl<'a> RepoStorage<'a> {
)
}
pub fn add_branch_from_info(
repo_id: &RepoId,
branch_info: &BranchInfo,
storage: &'a dyn KCVStorage,
) -> Result<(), StorageError> {
BranchStorage::create_from_info(branch_info, storage)?;
storage.write_transaction(&mut |tx| {
let repo_id_ser = to_vec(&repo_id)?;
let branch_id_ser = to_vec(&branch_info.id)?;
let mut key = Vec::with_capacity(repo_id_ser.len() + branch_id_ser.len());
key.append(&mut repo_id_ser.clone());
key.append(&mut branch_id_ser.clone());
tx.put(Self::PREFIX_BRANCHES, &key, None, &vec![], &None)?;
if branch_info.branch_type == BranchType::Store {
tx.put(
Self::PREFIX,
&repo_id_ser,
Some(Self::STORE_BRANCH),
&branch_id_ser,
&None,
)?;
}
Ok(())
})?;
Ok(())
}
pub fn update_signer_cap(
signer_cap: &SignerCap,
storage: &'a dyn KCVStorage,
) -> Result<(), StorageError> {
let repo_id = signer_cap.repo;
let _ = Self::open(&repo_id, storage)?;
storage.write_transaction(&mut |tx| {
let id_ser = to_vec(&repo_id)?;
let value = to_vec(signer_cap)?;
tx.put(Self::PREFIX, &id_ser, Some(Self::SIGNER_CAP), &value, &None)?;
Ok(())
})?;
Ok(())
}
pub fn create(
id: &RepoId,
read_cap: &ReadCap,

@ -10,7 +10,11 @@
//! Storage of user application data (RDF, content of rich-text document, etc)
use ng_repo::{
block_storage::BlockStorage, errors::StorageError, repo::Repo, store::Store, types::*,
block_storage::BlockStorage,
errors::StorageError,
repo::{BranchInfo, Repo},
store::Store,
types::*,
};
use crate::types::*;
@ -36,6 +40,10 @@ pub trait UserStorage: Send + Sync {
fn load_repo(&self, repo_id: &RepoId, store: Arc<Store>) -> Result<Repo, StorageError>;
fn save_repo(&self, repo: &Repo) -> Result<(), StorageError>;
fn add_branch(&self, repo_id: &RepoId, branch_info: &BranchInfo) -> Result<(), StorageError>;
fn update_signer_cap(&self, signer_cap: &SignerCap) -> Result<(), StorageError>;
}
pub(crate) struct InMemoryUserStorage {
@ -77,4 +85,12 @@ impl UserStorage for InMemoryUserStorage {
fn save_repo(&self, repo: &Repo) -> Result<(), StorageError> {
unimplemented!();
}
fn add_branch(&self, repo_id: &RepoId, branch_info: &BranchInfo) -> Result<(), StorageError> {
unimplemented!();
}
fn update_signer_cap(&self, signer_cap: &SignerCap) -> Result<(), StorageError> {
unimplemented!();
}
}

@ -8,16 +8,16 @@
// according to those terms.
//! Repo object (on heap) to handle a Repository
use crate::commits::*;
use crate::types::*;
use crate::user_storage::repo::RepoStorage;
use ng_net::actor::SoS;
use ng_net::broker::{Broker, BROKER};
use ng_repo::log::*;
use ng_repo::object::Object;
use ng_repo::repo::BranchInfo;
use ng_repo::{
block_storage::BlockStorage,
errors::{NgError, ProtocolError, ServerError, StorageError},
errors::{NgError, ProtocolError, ServerError, StorageError, VerifierError},
file::RandomAccessFile,
repo::Repo,
store::Store,
@ -48,6 +48,19 @@ use serde::{Deserialize, Serialize};
use web_time::SystemTime;
//use yrs::{StateVector, Update};
// pub trait IVerifier {
// fn add_branch_and_save(
// &mut self,
// repo_id: &RepoId,
// branch_info: BranchInfo,
// store_repo: &StoreRepo,
// ) -> Result<(), VerifierError>;
// fn add_repo_and_save(&mut self, repo: Repo) -> &Repo;
// fn get_repo(&self, id: &RepoId, store_repo: &StoreRepo) -> Result<&Repo, NgError>;
// }
pub struct Verifier {
pub config: VerifierConfig,
pub connected_server_id: Option<PubKey>,
@ -60,7 +73,12 @@ pub struct Verifier {
last_reservation: SystemTime,
stores: HashMap<OverlayId, Arc<Store>>,
repos: HashMap<RepoId, Repo>,
topics: HashMap<(OverlayId, TopicId), (RepoId, BranchId)>,
// TODO: deal with collided repo_ids. self.repos should be a HashMap<RepoId,Collision> enum Collision {Yes, No(Repo)}
// add a collided_repos: HashMap<(OverlayId, RepoId), Repo>
// only use get_repo() everywhere in the code (always passing the overlay) so that collisions can be handled.
// also do the same in RocksdbStorage
/// (OverlayId, TopicId), (RepoId, BranchId)
pub(crate) topics: HashMap<(OverlayId, TopicId), (RepoId, BranchId)>,
/// only used for InMemory type, to store the outbox
in_memory_outbox: Vec<EventOutboxStorage>,
}
@ -79,6 +97,10 @@ struct EventOutboxStorage {
}
impl Verifier {
pub fn user_privkey(&self) -> &PrivKey {
&self.config.user_priv_key
}
#[allow(deprecated)]
#[cfg(any(test, feature = "testing"))]
pub fn new_dummy() -> Self {
@ -183,24 +205,59 @@ impl Verifier {
.stores
.remove(&overlay_id)
.ok_or(NgError::StoreNotFound)?;
// let mut repo = self
// .repos
// .remove(store_repo.repo_id())
// .ok_or(NgError::RepoNotFound)?;
// if repo_already_inserted {
// let mut repo = self
// .repos
// .remove(store_repo.repo_id())
// .ok_or(NgError::RepoNotFound)?;
// log_info!(
// "{}",
// Arc::<ng_repo::store::Store>::strong_count(&repo.store)
// );
// }
drop(repo.store);
//log_info!("{}", Arc::<ng_repo::store::Store>::strong_count(&store));
let mut mut_store = Arc::<ng_repo::store::Store>::into_inner(store).unwrap();
mut_store.set_read_caps(read_cap, overlay_read_cap);
let new_store = Arc::new(mut_store);
let _ = self.stores.insert(overlay_id, Arc::clone(&new_store));
repo.store = new_store;
// if repo_already_inserted {
// let _ = self.repos.insert(*store_repo.repo_id(), repo);
// }
Ok(repo)
}
pub fn complete_site_store_already_inserted(
&mut self,
store_repo: StoreRepo,
) -> Result<(), NgError> {
let overlay_id = store_repo.overlay_id_for_storage_purpose();
let store = self
.stores
.remove(&overlay_id)
.ok_or(NgError::StoreNotFound)?;
let mut repo = self.repos.remove(store.id()).ok_or(NgError::RepoNotFound)?;
// log_info!(
// "{}",
// Arc::<ng_repo::store::Store>::strong_count(&repo.store)
// );
let read_cap = repo.read_cap.to_owned().unwrap();
let overlay_read_cap = repo.overlay_branch_read_cap().cloned();
drop(repo.store);
//log_info!("{}", Arc::<ng_repo::store::Store>::strong_count(&store));
let mut mut_store = Arc::<ng_repo::store::Store>::into_inner(store).unwrap();
mut_store.set_read_caps(read_cap, overlay_read_cap);
let new_store = Arc::new(mut_store);
let _ = self.stores.insert(overlay_id, Arc::clone(&new_store));
// TODO: store in user_storage
repo.store = new_store;
//let _ = self.repos.insert(*store_repo.repo_id(), repo);
Ok(repo)
let _ = self.repos.insert(*store_repo.repo_id(), repo);
Ok(())
}
pub fn get_store(&self, store_repo: &StoreRepo) -> Result<Arc<Store>, NgError> {
@ -213,9 +270,9 @@ impl Verifier {
&mut self,
id: &RepoId,
store_repo: &StoreRepo,
) -> Result<&mut Repo, NgError> {
) -> Result<&mut Repo, VerifierError> {
let store = self.get_store(store_repo);
let repo_ref = self.repos.get_mut(id).ok_or(NgError::RepoNotFound);
let repo_ref = self.repos.get_mut(id).ok_or(VerifierError::RepoNotFound);
// .or_insert_with(|| {
// // load from storage
// Repo {
@ -232,12 +289,6 @@ impl Verifier {
repo_ref
}
pub fn get_repo(&self, id: RepoId, store_repo: &StoreRepo) -> Result<&Repo, NgError> {
//let store = self.get_store(store_repo);
let repo_ref = self.repos.get(&id).ok_or(NgError::RepoNotFound);
repo_ref
}
pub fn add_store(&mut self, store: Arc<Store>) {
let overlay_id = store.get_store_repo().overlay_id_for_storage_purpose();
if self.stores.contains_key(&overlay_id) {
@ -299,7 +350,7 @@ impl Verifier {
let publisher = self.config.peer_priv_key.clone();
self.last_seq_num += 1;
let seq_num = self.last_seq_num;
let repo = self.get_repo(repo_id, store_repo)?;
let repo = self.get_repo(&repo_id, store_repo)?;
let event = Event::new(&publisher, seq_num, commit, additional_blocks, repo)?;
self.send_or_save_event_to_outbox(event, repo.store.inner_overlay())
@ -443,7 +494,7 @@ impl Verifier {
event: Event,
overlay: OverlayId,
) -> Result<(), NgError> {
log_debug!("========== EVENT {:03}: {}", event.seq_num(), event);
//log_info!("========== EVENT {:03}: {}", event.seq_num(), event);
if self.connected_server_id.is_some() {
// send the event to the server already
@ -586,8 +637,315 @@ impl Verifier {
Ok(())
}
pub fn deliver(&mut self, event: Event) {}
pub fn verify_commit(
&mut self,
commit: Commit,
store: Arc<Store>,
) -> Result<(), VerifierError> {
//let quorum_type = commit.quorum_type();
// log_info!(
// "VERIFYING {} {} {:?}",
// store.get_store_repo(),
// commit,
// store
// );
match commit.body().ok_or(VerifierError::CommitBodyNotFound)? {
CommitBody::V0(v0) => match v0 {
CommitBodyV0::Repository(a) => a.verify(&commit, self, store),
CommitBodyV0::RootBranch(a) => a.verify(&commit, self, store),
CommitBodyV0::Branch(a) => a.verify(&commit, self, store),
CommitBodyV0::SyncSignature(a) => a.verify(&commit, self, store),
CommitBodyV0::AddBranch(a) => a.verify(&commit, self, store),
CommitBodyV0::StoreUpdate(a) => a.verify(&commit, self, store),
CommitBodyV0::AddSignerCap(a) => a.verify(&commit, self, store),
_ => unimplemented!(),
},
}
}
pub(crate) fn add_branch_and_save(
&mut self,
repo_id: &RepoId,
branch_info: BranchInfo,
store_repo: &StoreRepo,
) -> Result<(), VerifierError> {
let user_storage = self
.user_storage
.as_ref()
.map(|us| Arc::clone(us))
.and_then(|u| if self.is_persistent() { Some(u) } else { None });
if user_storage.is_some() {
user_storage.unwrap().add_branch(repo_id, &branch_info)?;
}
let branch_id = branch_info.id.clone();
let topic_id = branch_info.topic.clone();
let repo = self.get_repo_mut(repo_id, store_repo)?;
let res = repo.branches.insert(branch_info.id.clone(), branch_info);
assert!(res.is_none());
let overlay_id: OverlayId = repo.store.inner_overlay();
let repo_id = repo_id.clone();
let res = self
.topics
.insert((overlay_id, topic_id), (repo_id, branch_id));
assert_eq!(res, None);
Ok(())
}
pub(crate) fn update_branch(
&self,
repo_id: &RepoId,
branch_id: &BranchId,
store_repo: &StoreRepo,
) -> Result<(), VerifierError> {
let user_storage = self
.user_storage
.as_ref()
.map(|us| Arc::clone(us))
.and_then(|u| if self.is_persistent() { Some(u) } else { None });
if user_storage.is_some() {
let repo = self.get_repo(repo_id, store_repo)?;
user_storage
.unwrap()
.add_branch(repo_id, repo.branch(branch_id)?)?;
}
Ok(())
}
pub(crate) fn update_signer_cap(&self, signer_cap: &SignerCap) -> Result<(), VerifierError> {
let user_storage = self
.user_storage
.as_ref()
.map(|us| Arc::clone(us))
.and_then(|u| if self.is_persistent() { Some(u) } else { None });
if user_storage.is_some() {
user_storage.unwrap().update_signer_cap(signer_cap)?;
}
Ok(())
}
pub(crate) fn add_repo_and_save(&mut self, repo: Repo) -> &Repo {
let user_storage = self
.user_storage
.as_ref()
.map(|us| Arc::clone(us))
.and_then(|u| if self.is_persistent() { Some(u) } else { None });
let repo_ref: &Repo = self.add_repo_(repo);
// save in user_storage
if user_storage.is_some() {
let _ = user_storage.unwrap().save_repo(repo_ref);
}
repo_ref
}
pub(crate) fn get_repo(&self, id: &RepoId, store_repo: &StoreRepo) -> Result<&Repo, NgError> {
//let store = self.get_store(store_repo);
let repo_ref = self.repos.get(id).ok_or(NgError::RepoNotFound);
repo_ref
}
fn load_from_credentials_and_events(
&mut self,
events: &Vec<EventOutboxStorage>,
) -> Result<(), NgError> {
let private_store_id = self.config.private_store_id.as_ref().unwrap();
let private_outer_overlay_id = OverlayId::outer(private_store_id);
let private_inner_overlay_id = OverlayId::inner(
private_store_id,
&self.config.private_store_read_cap.as_ref().unwrap().key,
);
// let storage = self.block_storage.as_ref().unwrap().write().unwrap();
let store_repo = StoreRepo::new_private(*private_store_id);
let store = Arc::new(Store::new(
store_repo,
self.config.private_store_read_cap.to_owned().unwrap(),
self.config.private_store_read_cap.to_owned().unwrap(),
Arc::clone(
&self
.block_storage
.as_ref()
.ok_or(core::fmt::Error)
.expect("couldn't get the block_storage"),
),
));
let store = self
.stores
.entry(private_outer_overlay_id)
.or_insert_with(|| store);
let private_store = Arc::clone(store);
// for e in events {
// if e.overlay == private_inner_overlay_id {
// // it is an event about the private store
// // we will load only the commits on the root branch.
// let load = if let Ok(repo) =
// self.get_repo(private_store.id(), private_store.get_store_repo())
// {
// if let Some(root) = repo.root_branch() {
// root.topic == *e.event.topic_id()
// } else {
// true
// }
// } else {
// true
// };
// if !load {
// continue;
// }
// let commit = e.event.open(
// &private_store,
// private_store.id(),
// private_store.id(),
// private_store.get_store_readcap_secret(),
// )?;
// self.verify_commit(commit, Arc::clone(&private_store))?;
// }
// }
// let repo = self.get_repo(private_store.id(), private_store.get_store_repo())?;
// let root_topic = repo
// .root_branch()
// .ok_or(VerifierError::RootBranchNotFound)?
// .topic;
// 2nd pass: load all the other branches of the private store repo.
// 1st pass: load all events about private store
let mut postponed_signer_caps = Vec::with_capacity(3);
for e in events {
if e.overlay == private_inner_overlay_id {
// it is an event about the private store
//log_info!("VERIFYING EVENT {} {}", e.overlay, e.event);
let (branch_id, branch_secret) =
match self.get_repo(private_store.id(), private_store.get_store_repo()) {
Err(_) => (private_store.id(), private_store.get_store_readcap_secret()),
Ok(repo) => {
let (_, branch_id) = self
.topics
.get(&(e.overlay, *e.event.topic_id()))
.ok_or(VerifierError::TopicNotFound)?;
let branch = repo.branch(branch_id)?;
(branch_id, &branch.read_cap.key)
}
};
let commit =
e.event
.open(&private_store, private_store.id(), branch_id, branch_secret)?;
if commit
.body()
.ok_or(VerifierError::CommitBodyNotFound)?
.is_add_signer_cap()
{
postponed_signer_caps.push(commit);
} else {
self.verify_commit(commit, Arc::clone(&private_store))?;
}
}
}
// for e in events {
// if e.overlay == private_inner_overlay_id {
// // it is an event about the private store
// // we will load only the commits that are not on the root branch.
// if root_topic == *e.event.topic_id() {
// continue;
// }
// let repo = self.get_repo(private_store.id(), private_store.get_store_repo())?;
// let (_, branch_id) = self
// .topics
// .get(&(e.overlay, *e.event.topic_id()))
// .ok_or(VerifierError::TopicNotFound)?;
// let branch = repo.branch(branch_id)?;
// let commit = e.event.open_with_info(repo, branch)?;
// if commit
// .body()
// .ok_or(VerifierError::CommitBodyNotFound)?
// .is_add_signer_cap()
// {
// postponed_signer_caps.push(commit);
// } else {
// self.verify_commit(commit, Arc::clone(&private_store))?;
// }
// }
// }
//log_info!("{:?}\n{:?}\n{:?}", self.repos, self.stores, self.topics);
// 2nd pass : load the other events (that are not from private store)
for (overlay, store) in self.stores.clone().iter() {
//log_info!("TRYING OVERLAY {} {}", overlay, private_outer_overlay_id);
if *overlay == private_outer_overlay_id {
//log_info!("SKIPPED");
continue;
// we skip the private store, as we already loaded it
}
let store_inner_overlay_id = store.inner_overlay();
for e in events {
if e.overlay == store_inner_overlay_id {
// it is an event about the store we are loading
//log_info!("VERIFYING EVENT {} {}", e.overlay, e.event);
let (branch_id, branch_secret) =
match self.get_repo(store.id(), store.get_store_repo()) {
Err(_) => (store.id(), store.get_store_readcap_secret()),
Ok(repo) => {
let (_, branch_id) = self
.topics
.get(&(e.overlay, *e.event.topic_id()))
.ok_or(VerifierError::TopicNotFound)?;
let branch = repo.branch(branch_id)?;
(branch_id, &branch.read_cap.key)
}
};
let commit = e.event.open(store, store.id(), branch_id, branch_secret)?;
self.verify_commit(commit, Arc::clone(store))?;
} else {
// log_info!(
// "SKIPPED wrong overlay {} {}",
// e.overlay,
// store_inner_overlay_id
// );
}
}
}
// let list: Vec<(OverlayId, StoreRepo)> = self
// .stores
// .iter()
// .map(|(o, s)| (o.clone(), s.get_store_repo().clone()))
// .collect();
// for (overlay, store_repo) in list {
// if overlay == private_outer_overlay_id {
// continue;
// // we skip the private store, as we already loaded it
// }
// self.complete_site_store_already_inserted(store_repo)?;
// }
// finally, ingest the signer_caps.
for signer_cap in postponed_signer_caps {
self.verify_commit(signer_cap, Arc::clone(&private_store))?;
}
Ok(())
}
pub async fn send_outbox(&mut self) -> Result<(), NgError> {
let events = self.take_events_from_outbox()?;
let events: Vec<EventOutboxStorage> = self.take_events_from_outbox()?;
let broker = BROKER.write().await;
let user = self.config.user_priv_key.to_pub();
let remote = self
@ -595,6 +953,54 @@ impl Verifier {
.as_ref()
.ok_or(NgError::NotConnected)?
.clone();
// for all the events, check that they are valid (topic exists, current_heads match with event)
let mut need_replay = false;
let mut branch_heads: HashMap<BranchId, Vec<ObjectRef>> = HashMap::new();
for e in events.iter() {
match self.topics.get(&(e.overlay, *e.event.topic_id())) {
Some((repo_id, branch_id)) => match self.repos.get(repo_id) {
Some(repo) => match repo.branches.get(branch_id) {
Some(branch) => {
let commit = e.event.open_with_info(repo, branch)?;
let acks = commit.acks();
match branch_heads.insert(*branch_id, vec![commit.reference().unwrap()])
{
Some(previous_heads) => {
if previous_heads != acks {
need_replay = true;
break;
}
}
None => {
if acks != branch.current_heads {
need_replay = true;
break;
}
}
}
}
None => {
need_replay = true;
break;
}
},
None => {
need_replay = true;
break;
}
},
None => {
need_replay = true;
break;
}
}
}
log_info!("NEED REPLAY {need_replay}");
if need_replay {
self.load_from_credentials_and_events(&events)?;
}
for e in events {
self.send_event(e.event, &broker, &user, &remote, e.overlay)
.await?;
@ -738,20 +1144,6 @@ impl Verifier {
self.add_repo_(repo);
}
fn add_repo_and_save(&mut self, repo: Repo) -> &Repo {
let user_storage = self
.user_storage
.as_ref()
.map(|us| Arc::clone(us))
.and_then(|u| if self.is_persistent() { Some(u) } else { None });
let repo_ref: &Repo = self.add_repo_(repo);
// save in user_storage
if user_storage.is_some() {
let _ = user_storage.unwrap().save_repo(repo_ref);
}
repo_ref
}
fn add_repo_(&mut self, repo: Repo) -> &Repo {
for (branch_id, info) in repo.branches.iter() {
//log_info!("LOADING BRANCH: {}", branch_id);
@ -795,6 +1187,26 @@ impl Verifier {
Ok(())
}
pub(crate) fn new_store_from_update(
&mut self,
update: &StoreUpdate,
) -> Result<(), VerifierError> {
let store = Store::new_from(
update,
Arc::clone(
self.block_storage
.as_ref()
.ok_or(VerifierError::NoBlockStorageAvailable)?,
),
);
let overlay_id = store.get_store_repo().overlay_id_for_storage_purpose();
let store = self
.stores
.entry(overlay_id)
.or_insert_with(|| Arc::new(store));
Ok(())
}
pub async fn new_store_default<'a>(
&'a mut self,
creator: &UserId,

Loading…
Cancel
Save