diff --git a/Cargo.lock b/Cargo.lock index 5ea468c7..97135531 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2752,10 +2752,11 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.64" +version = "0.3.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f195fe497f702db0f318b07fdd68edb16955aed830df8363d837542f8f935a" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" dependencies = [ + "once_cell", "wasm-bindgen", ] @@ -3353,6 +3354,7 @@ dependencies = [ "async-std", "async-trait", "base64-url", + "crypto_box", "ed25519-dalek", "either", "futures", @@ -3402,7 +3404,7 @@ dependencies = [ "serde", "sha1", "sha2 0.10.8", - "siphasher 0.3.10", + "siphasher 1.0.1", "thiserror", "zstd", ] @@ -6606,12 +6608,13 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.37" +version = "0.4.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c02dbc21516f9f1f04f187958890d7e6026df8d16540b7ad9492bc34a67cea03" +checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" dependencies = [ "cfg-if", "js-sys", + "once_cell", "wasm-bindgen", "web-sys", ] @@ -6701,9 +6704,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.64" +version = "0.3.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b85cbef8c220a6abc02aefd892dfc0fc23afb1c6a426316ec33253a3877249b" +checksum = "33b6dd2ef9186f1f2072e409e99cd22a975331a6b3591b12c764e0e55c60d5d2" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/helpers/ngaccount/.env b/helpers/ngaccount/.env index fb63cde8..689de724 100644 --- a/helpers/ngaccount/.env +++ b/helpers/ngaccount/.env @@ -1,5 +1,5 @@ -NG_ACCOUNT_DOMAIN= -NG_ACCOUNT_ADMIN= -NG_ACCOUNT_LOCAL_PEER_KEY= -NG_ACCOUNT_SERVER=127.0.0.1,14400,[the broker's peer ID] -RUST_LOG= +export NG_ACCOUNT_DOMAIN=test.com +export NG_ACCOUNT_ADMIN=vC1HC5YZQppsHUkerBe2oXoNjAlleuvkpXpdUIWnRO8A +export NG_ACCOUNT_LOCAL_PEER_KEY=5Q2FBa_VrUSGgJtsKc9Zstrtr0PyZYEgmeGxgtjrIo4A +export NG_ACCOUNT_SERVER=127.0.0.1,14400,95h47CSgbyHyHU4NAiaBd3hE76VONaQHlG9Mx0aJOLUA +export RUST_LOG=debug diff --git a/helpers/ngaccount/README.md b/helpers/ngaccount/README.md index e1044738..dc66aa03 100644 --- a/helpers/ngaccount/README.md +++ b/helpers/ngaccount/README.md @@ -21,11 +21,11 @@ edit your `.env` file as follow `NG_ACCOUNT_LOCAL_PEER_KEY` is given as an example. You can generate a random one by using the command `ngcli gen-key` and use the private key. ``` -NG_ACCOUNT_DOMAIN=test.com -NG_ACCOUNT_ADMIN=[YOUR_USER_PRIV_KEY] -NG_ACCOUNT_LOCAL_PEER_KEY=kbz34OFqaWu59xYaqViP0esME2MmcroS94pc4lEEsEsA -NG_ACCOUNT_SERVER=127.0.0.1,14400,[YOUR_NGD_PEER_ID] -RUST_LOG=debug +export NG_ACCOUNT_DOMAIN=test.com +export NG_ACCOUNT_ADMIN=[YOUR_USER_PRIV_KEY] +export NG_ACCOUNT_LOCAL_PEER_KEY=kbz34OFqaWu59xYaqViP0esME2MmcroS94pc4lEEsEsA +export NG_ACCOUNT_SERVER=127.0.0.1,14400,[YOUR_NGD_PEER_ID] +export RUST_LOG=debug ``` diff --git a/helpers/wasm-tools/Cargo.toml b/helpers/wasm-tools/Cargo.toml index dd708782..75880103 100644 --- a/helpers/wasm-tools/Cargo.toml +++ b/helpers/wasm-tools/Cargo.toml @@ -36,7 +36,7 @@ sys-locale = { version = "0.3.1", features = ["js"] } [target.'cfg(target_arch = "wasm32")'.dependencies] serde-wasm-bindgen = "0.6" -wasm-bindgen-futures = "0.4.34" +wasm-bindgen-futures = "0.4.50" [dev-dependencies] wasm-bindgen-test = "^0.3" diff --git a/nextgraph/src/local_broker.rs b/nextgraph/src/local_broker.rs index c85ddcba..74f13e9b 100644 --- a/nextgraph/src/local_broker.rs +++ b/nextgraph/src/local_broker.rs @@ -567,21 +567,28 @@ impl fmt::Debug for LocalBroker { } #[doc(hidden)] -#[async_trait::async_trait] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] pub trait ILocalBroker: Send + Sync + EActor { async fn deliver(&mut self, event: Event, overlay: OverlayId, user: UserId); - + async fn inbox(&mut self, user_id: UserId, msg: InboxMsg, from_queue: bool); async fn user_disconnected(&mut self, user_id: UserId); } -// used to deliver events to the verifier on Clients, or Core that have Verifiers attached. -#[async_trait::async_trait] +// used to deliver events to the verifier on Clients, or on Cores that have Verifiers attached. +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] impl ILocalBroker for LocalBroker { async fn deliver(&mut self, event: Event, overlay: OverlayId, user_id: UserId) { if let Some(session) = self.get_mut_session_for_user(&user_id) { session.verifier.deliver(event, overlay).await; } } + async fn inbox(&mut self, user_id: UserId, msg: InboxMsg, from_queue: bool) { + if let Some(session) = self.get_mut_session_for_user(&user_id) { + session.verifier.inbox(msg, from_queue).await; + } + } async fn user_disconnected(&mut self, user_id: UserId) { if let Some(session) = self.get_mut_session_for_user(&user_id) { session.verifier.connection_lost(); @@ -629,6 +636,9 @@ async fn pump( overlay, user, } => broker.deliver(event, overlay, user).await, + LocalBrokerMessage::Inbox {msg, user_id, from_queue} => { + broker.inbox(user_id, msg, from_queue).await + }, LocalBrokerMessage::Disconnected { user_id } => broker.user_disconnected(user_id).await, } } @@ -700,7 +710,7 @@ impl LocalBroker { .await .connect( Arc::new(Box::new(ConnectionWebSocket {})), - config.client_peer_key.as_ref().unwrap().clone(), + config.client_peer_key.to_owned().unwrap(), config.client_peer_key.as_ref().unwrap().to_pub(), config.server_peer_id, StartConfig::App(AppConfig { @@ -1074,7 +1084,7 @@ impl LocalBroker { as Arc> } }; - let client = wallet.client().as_ref().unwrap().clone(); + let client = wallet.client().to_owned().unwrap(); let opened_wallet = OpenedWallet { wallet, block_storage, @@ -2489,6 +2499,12 @@ pub async fn user_connect_with_device_info( tried.as_mut().unwrap().3 = Some(e.to_string()); } else { local_broker.start_pump().await; + + // try to pop inbox msg + let broker = BROKER.read().await; + broker + .send_client_event(&Some(*user), &Some(server_key), ClientEvent::InboxPopRequest) + .await?; } break; } else { diff --git a/ng-app/src/apps/SocialQueryDemo.svelte b/ng-app/src/apps/SocialQueryDemo.svelte new file mode 100644 index 00000000..d320efb1 --- /dev/null +++ b/ng-app/src/apps/SocialQueryDemo.svelte @@ -0,0 +1,97 @@ + + + +
+

Social Query

+ {#if !source} +

{$t("doc.no_triples")}

+ {/if} + + + + {#if source} + + + + {/if} + +
+ \ No newline at end of file diff --git a/ng-app/src/classes.ts b/ng-app/src/classes.ts index 0d852b53..2afd9d3b 100644 --- a/ng-app/src/classes.ts +++ b/ng-app/src/classes.ts @@ -784,6 +784,25 @@ export const official_classes = { "ng:n": "Live", "ng:a": "A live session of video or audio, with optional chat", }, + "social:profile:skills:programming": { + "ng:crdt": "Graph", + "ng:n": "Skills Profile", + "ng:a": "Define a profile of skills on programming, abd you or other users", + }, + "social:query:skills:programming": { + "ng:crdt": "Graph", + "ng:n": "Find programmers", + "ng:a": "Social query for Skills on programming", + "ng:o": "n:g:z:social_query_skills_programming_editor", // default viewer + "ng:w": "n:g:z:social_query_skills_programming_editor", // default editor + "implemented": true + }, + "social:query:forwarded": { + "ng:crdt": "Graph", + "ng:n": "Forwarded Social Query", + "ng:a": "An internal document to store the progress of a forwarded social query", + "implemented": true + }, "prod:task": { "ng:crdt": "Graph", "ng:n": "Task", diff --git a/ng-app/src/lib/FullLayout.svelte b/ng-app/src/lib/FullLayout.svelte index 1cbefd81..d1246887 100644 --- a/ng-app/src/lib/FullLayout.svelte +++ b/ng-app/src/lib/FullLayout.svelte @@ -272,7 +272,8 @@ "social:channel", "social:scheduler", "social:calendar", - "social:live" + "social:live", + "social:query:skills:programming", ]; const create_pro_items = [ @@ -1035,7 +1036,7 @@ {#if createMenuOpened.social } {#each create_social_items as item} - + new_document(item) : undefined }> {get_class(item)["ng:n"]} diff --git a/ng-app/src/lib/NoWallet.svelte b/ng-app/src/lib/NoWallet.svelte index 0941992a..264b261c 100644 --- a/ng-app/src/lib/NoWallet.svelte +++ b/ng-app/src/lib/NoWallet.svelte @@ -34,7 +34,7 @@ {@html $t("pages.no_wallet.description")}

- If you had created a wallet before 17 April 2025, it doesn't work anymore. please create a new wallet. We are still in alpha, and this can happen again. + If you had created a wallet before 8 May 2025, it doesn't work anymore. please create a new wallet. We are still in alpha, and this can happen again.
diff --git a/ng-app/src/lib/icons/DataClassIcon.svelte b/ng-app/src/lib/icons/DataClassIcon.svelte index 799586e3..4655c56e 100644 --- a/ng-app/src/lib/icons/DataClassIcon.svelte +++ b/ng-app/src/lib/icons/DataClassIcon.svelte @@ -75,6 +75,9 @@ Variable, Language, QueueList, + Rss, + Lifebuoy, + Star } from "svelte-heros-v2"; import PdfIcon from "./PdfIcon.svelte"; @@ -135,6 +138,9 @@ "social:reaction": Heart, "social:chatroom": ChatBubbleLeftRight, "social:live": Fire, + "social:query:forwarded": Rss, + "social:query:skills:programming": Lifebuoy, + "social:profile:skills:programming": Star, "prod:task": Stop, "prod:project": Flag, "prod:issue": HandRaised, diff --git a/ng-app/src/lib/icons/ZeraIcon.svelte b/ng-app/src/lib/icons/ZeraIcon.svelte index 898f66c6..10c6c8cc 100644 --- a/ng-app/src/lib/icons/ZeraIcon.svelte +++ b/ng-app/src/lib/icons/ZeraIcon.svelte @@ -72,6 +72,7 @@ Eye, Square3Stack3d, QueueList, + Lifebuoy, } from "svelte-heros-v2"; import JsonIcon from "./JsonIcon.svelte"; import JsonLdIcon from "./JsonLdIcon.svelte"; @@ -113,6 +114,7 @@ list: ListBullet, grid: Squares2x2, view: Eye, + social_query: Lifebuoy, }; const prefix_mapping = {}; diff --git a/ng-app/src/main-web.ts b/ng-app/src/main-web.ts index 4c31164c..2ef4cde0 100644 --- a/ng-app/src/main-web.ts +++ b/ng-app/src/main-web.ts @@ -16,10 +16,10 @@ if (!import.meta.env.TAURI_PLATFORM) { // cleaning old wallets :( try { let version = localStorage.getItem("ng_wallet_version"); - if (!version || version != "0.1.1") { + if (!version || version != "0.1.2-alpha.1") { localStorage.clear(); sessionStorage.clear(); - localStorage.setItem("ng_wallet_version","0.1.1") + localStorage.setItem("ng_wallet_version","0.1.2-alpha.1") } } catch (e) { diff --git a/ng-app/src/zeras.ts b/ng-app/src/zeras.ts index 9833048c..15af029c 100644 --- a/ng-app/src/zeras.ts +++ b/ng-app/src/zeras.ts @@ -92,6 +92,17 @@ export const official_apps = { "ng:w": [], implemented: true, }, + "n:g:z:social_query_skills_programming_editor": { + "ng:n": "Social Query", + "ng:a": "Create and View your Social Query about Programming Skills", + "ng:c": "app", + "ng:u": "social_query",//favicon. can be a did:ng:j + "ng:g": "n:g:z:social_query_skills_programming_editor", + "ng:b": "SocialQueryDemo", + "ng:o": ["social:query:skills:programming"], + "ng:w": ["social:query:skills:programming"], + implemented: true, + }, "n:g:z:sparql_query": { "ng:n": "SPARQL Query", "ng:a": "View, edit and invoke a Graph SPARQL query", diff --git a/ng-broker/src/rocksdb_server_storage.rs b/ng-broker/src/rocksdb_server_storage.rs index dc1b6f0b..efe5395e 100644 --- a/ng-broker/src/rocksdb_server_storage.rs +++ b/ng-broker/src/rocksdb_server_storage.rs @@ -146,6 +146,8 @@ impl RocksDbServerStorage { core_storage.add_class(&RepoHashStorage::CLASS); core_storage.add_class(&OverlayStorage::CLASS); core_storage.add_class(&CommitStorage::CLASS); + core_storage.add_class(&InboxStorage::CLASS); + core_storage.add_class(&AccountStorage::CLASS); core_storage.check_prefixes(); } @@ -276,6 +278,38 @@ impl RocksDbServerStorage { inv.del()?; Ok(()) } + pub(crate) fn get_inboxes_for_readers(&self, user: &UserId) -> Result,StorageError> { + AccountStorage::load_inboxes(user, &self.core_storage) + } + + pub(crate) fn take_first_msg_from_inbox( + &self, + inbox: &PubKey, + overlay: &OverlayId + ) -> Result { + InboxStorage::take_first_msg(inbox, overlay, &self.core_storage) + } + + pub(crate) fn get_readers_for_inbox( + &self, + inbox: &PubKey, + overlay: &OverlayId + ) -> Result, StorageError> { + InboxStorage::load_readers(inbox, overlay, &self.core_storage) + } + + pub(crate) fn register_inbox_reader(&self, user_id: UserId, inbox_id: PubKey, overlay: OverlayId) -> Result<(), StorageError> { + InboxStorage::register_reader(&inbox_id, &overlay, &user_id, &self.core_storage)?; + AccountStorage::add_inbox(&user_id, inbox_id, overlay, &self.core_storage) + } + + pub(crate) fn enqueue_inbox_msg( + &self, + msg: &InboxMsg + ) -> Result<(), StorageError> { + InboxStorage::open(&msg.body.to_inbox, &msg.body.to_overlay, &self.core_storage)?.enqueue_msg(msg) + } + pub(crate) fn get_repo_pin_status( &self, overlay: &OverlayId, diff --git a/ng-broker/src/server_broker.rs b/ng-broker/src/server_broker.rs index d9305aac..b7a17715 100644 --- a/ng-broker/src/server_broker.rs +++ b/ng-broker/src/server_broker.rs @@ -822,6 +822,46 @@ impl IServerBroker for ServerBroker { } } + async fn inbox_post(&self, post: InboxPost) -> Result<(), ServerError> { + + // TODO: deal with Inbox that is not local to the broker (use Core protocol to dispatch it) + + let users = self.storage.get_readers_for_inbox(&post.msg.body.to_inbox, &post.msg.body.to_overlay)?; + if users.is_empty() { + self.storage.enqueue_inbox_msg(&post.msg)?; + return Ok(()) + } + + let broker = BROKER.read().await; + let not_dispatched = broker + .dispatch_inbox_msg(&users, post.msg) + .await?; + if let Some(msg) = not_dispatched { + self.storage.enqueue_inbox_msg(&msg)?; + } + Ok(()) + } + + fn inbox_register(&self, user_id: UserId, registration: InboxRegister) -> Result<(), ServerError> { + + self.storage.register_inbox_reader(user_id, registration.inbox_id, registration.overlay)?; + Ok(()) + } + + async fn inbox_pop_for_user(&self, user: UserId ) -> Result { + let inboxes = self.storage.get_inboxes_for_readers(&user)?; + + for (inbox,overlay) in inboxes { + match self.storage.take_first_msg_from_inbox(&inbox, &overlay) { + Ok(msg) => { + return Ok(msg) + }, + Err(_) => {} + } + } + Err(ServerError::NotFound) + } + async fn dispatch_event( &self, overlay: &OverlayId, diff --git a/ng-broker/src/server_storage/core/account.rs b/ng-broker/src/server_storage/core/account.rs new file mode 100644 index 00000000..79d52bca --- /dev/null +++ b/ng-broker/src/server_storage/core/account.rs @@ -0,0 +1,95 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +//! Account Storage (Object Key/Col/Value Mapping) + +use std::collections::HashSet; +use std::hash::{DefaultHasher, Hash, Hasher}; + +use ng_net::types::InboxMsg; +use ng_repo::utils::now_precise_timestamp; +use serde_bare::to_vec; + +use ng_repo::errors::StorageError; +use ng_repo::kcv_storage::*; +use ng_repo::types::*; + +pub struct AccountStorage<'a> { + key: Vec, + storage: &'a dyn KCVStorage, +} + +impl<'a> IModel for AccountStorage<'a> { + fn key(&self) -> &Vec { + &self.key + } + fn storage(&self) -> &dyn KCVStorage { + self.storage + } + fn class(&self) -> &Class { + &Self::CLASS + } + fn existential(&mut self) -> Option<&mut dyn IExistentialValue> { + None + } +} + +impl<'a> AccountStorage<'a> { + // User <-> Inboxes : list of inboxes a user has registered as reader. + // FIXME: this should be in accounts storage, but because it doesn't implement the ORM yet, it is quicker to implement it here. + pub const INBOXES: MultiValueColumn = MultiValueColumn::new(b'k'); + + pub const CLASS: Class<'a> = Class::new( + "Account", + None, + None, + &[], + &[&Self::INBOXES as &dyn IMultiValueColumn], + ); + + pub fn load_inboxes( + user: &UserId, + storage: &'a dyn KCVStorage, + ) -> Result, StorageError> { + let mut opening = Self::new(user, storage); + Self::INBOXES.get_all(&mut opening) + } + + pub fn new(user: &UserId, storage: &'a dyn KCVStorage) -> Self { + let mut key: Vec = Vec::with_capacity(33); + key.append(&mut to_vec(user).unwrap()); + Self { key, storage } + } + + pub fn open( + user: &UserId, + storage: &'a dyn KCVStorage, + ) -> Result, StorageError> { + let opening = Self::new(user, storage); + Ok(opening) + } + + pub fn add_inbox( + user: &UserId, + inbox: PubKey, + overlay: OverlayId, + storage: &'a dyn KCVStorage, + ) -> Result<(), StorageError> { + let mut opening = Self::new(user, storage); + Self::INBOXES.add(&mut opening, &(inbox,overlay)) + } + + pub fn create( + user: &UserId, + storage: &'a dyn KCVStorage, + ) -> Result, StorageError> { + let creating = Self::new(user, storage); + Ok(creating) + } +} diff --git a/ng-broker/src/server_storage/core/inbox.rs b/ng-broker/src/server_storage/core/inbox.rs new file mode 100644 index 00000000..84196f24 --- /dev/null +++ b/ng-broker/src/server_storage/core/inbox.rs @@ -0,0 +1,120 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +//! Inbox Storage (Object Key/Col/Value Mapping) + +use std::collections::HashSet; +use std::hash::{DefaultHasher, Hash, Hasher}; + +use ng_net::types::InboxMsg; +use ng_repo::utils::now_precise_timestamp; +use serde_bare::to_vec; + +use ng_repo::errors::StorageError; +use ng_repo::kcv_storage::*; +use ng_repo::types::*; + +pub struct InboxStorage<'a> { + key: Vec, + storage: &'a dyn KCVStorage, +} + +impl<'a> IModel for InboxStorage<'a> { + fn key(&self) -> &Vec { + &self.key + } + fn storage(&self) -> &dyn KCVStorage { + self.storage + } + fn class(&self) -> &Class { + &Self::CLASS + } + fn existential(&mut self) -> Option<&mut dyn IExistentialValue> { + None + } +} + +// seconds, nanosecs, hash of InboxMsgBody +type MsgKeySuffix = (u64, u32, u64); + +impl<'a> InboxStorage<'a> { + // Inbox <-> Msg : list of incoming messages that will be delivered once a user is online + pub const MSGS: MultiMapColumn = MultiMapColumn::new(b'm'); + // Inbox <-> User : list of users who registered as readers of an inbox + pub const READERS: MultiValueColumn = MultiValueColumn::new(b'i'); + + pub const CLASS: Class<'a> = Class::new( + "Inbox", + None, + None, + &[], + &[&Self::MSGS as &dyn IMultiValueColumn, &Self::READERS], + ); + + pub fn take_first_msg( + inbox: &PubKey, + overlay: &OverlayId, + storage: &'a dyn KCVStorage, + ) -> Result { + let mut opening = Self::new(inbox, overlay, storage); + Self::MSGS.take_first_value(&mut opening) + } + + pub fn load_readers( + inbox: &PubKey, + overlay: &OverlayId, + storage: &'a dyn KCVStorage, + ) -> Result, StorageError> { + let mut opening = Self::new(inbox, overlay, storage); + Self::READERS.get_all(&mut opening) + } + + pub fn new(inbox: &PubKey, overlay: &OverlayId, storage: &'a dyn KCVStorage) -> Self { + let mut key: Vec = Vec::with_capacity(33 + 33); + key.append(&mut to_vec(overlay).unwrap()); + key.append(&mut to_vec(inbox).unwrap()); + Self { key, storage } + } + + pub fn open( + inbox: &PubKey, + overlay: &OverlayId, + storage: &'a dyn KCVStorage, + ) -> Result, StorageError> { + let opening = Self::new(inbox, overlay, storage); + Ok(opening) + } + + pub fn register_reader( + inbox: &PubKey, + overlay: &OverlayId, + reader: &UserId, + storage: &'a dyn KCVStorage, + ) -> Result<(), StorageError> { + let mut opening = Self::new(inbox, overlay, storage); + Self::READERS.add(&mut opening, reader) + } + + pub fn enqueue_msg(&mut self, msg: &InboxMsg) -> Result<(), StorageError> { + let (sec,nano) = now_precise_timestamp(); + let mut hasher = DefaultHasher::new(); + msg.body.hash(&mut hasher); + let key = (sec,nano, hasher.finish()); + Self::MSGS.add(self, &key,msg) + } + + pub fn create( + inbox: &PubKey, + overlay: &OverlayId, + storage: &'a dyn KCVStorage, + ) -> Result, StorageError> { + let creating = Self::new(inbox, overlay, storage); + Ok(creating) + } +} diff --git a/ng-broker/src/server_storage/core/mod.rs b/ng-broker/src/server_storage/core/mod.rs index bb68d394..0fabdedb 100644 --- a/ng-broker/src/server_storage/core/mod.rs +++ b/ng-broker/src/server_storage/core/mod.rs @@ -12,3 +12,9 @@ pub use repo::*; pub mod commit; pub use commit::*; + +pub mod inbox; +pub use inbox::*; + +pub mod account; +pub use account::*; diff --git a/ng-net/Cargo.toml b/ng-net/Cargo.toml index b753dc79..90e43406 100644 --- a/ng-net/Cargo.toml +++ b/ng-net/Cargo.toml @@ -32,6 +32,7 @@ unique_id = "0.1.5" noise-protocol = "0.2.0" noise-rust-crypto = "0.6.2" ed25519-dalek = "1.0.1" +crypto_box = { version = "0.8.2", features = ["seal"] } url = "2.4.0" regex = "1.8.4" base64-url = "2.0.0" diff --git a/ng-net/src/actors/client/client_event.rs b/ng-net/src/actors/client/client_event.rs new file mode 100644 index 00000000..7edda7fc --- /dev/null +++ b/ng-net/src/actors/client/client_event.rs @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + +use std::sync::Arc; + +use async_std::sync::Mutex; + +use ng_repo::errors::*; +use ng_repo::log::*; +use ng_repo::types::OverlayId; + +use crate::broker::BROKER; +use crate::connection::NoiseFSM; +use crate::types::*; +use crate::{actor::*, types::ProtocolMessage}; + +impl ClientEvent { + pub fn get_actor(&self, id: i64) -> Box { + Actor::::new_responder(id) + } +} + +impl TryFrom for ClientEvent { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + + if let ProtocolMessage::ClientMessage(ClientMessage::V0(ClientMessageV0 { + content: ClientMessageContentV0::ClientEvent(e), + .. + })) = msg + { + Ok(e) + } else { + log_debug!("INVALID {:?}", msg); + Err(ProtocolError::InvalidValue) + } + } +} + +impl From for ProtocolMessage { + fn from(e: ClientEvent) -> ProtocolMessage { + ProtocolMessage::ClientMessage(ClientMessage::V0(ClientMessageV0 { + content: ClientMessageContentV0::ClientEvent(e), + overlay: OverlayId::nil(), + padding: vec![] + })) + } +} + +impl Actor<'_, ClientEvent, ()> {} + +#[async_trait::async_trait] +impl EActor for Actor<'_, ClientEvent, ()> { + async fn respond( + &mut self, + msg: ProtocolMessage, + fsm: Arc>, + ) -> Result<(), ProtocolError> { + let req = ClientEvent::try_from(msg)?; + match req { + ClientEvent::InboxPopRequest => { + let sb = { BROKER.read().await.get_server_broker()? }; + let user = {fsm.lock().await.user_id()?}; + let res: Result = { + sb.read().await.inbox_pop_for_user(user).await + }; + + if let Ok(msg) = res { + let _ = fsm + .lock() + .await + .send(ProtocolMessage::ClientMessage(ClientMessage::V0( + ClientMessageV0 { + overlay: msg.body.to_overlay.clone(), + padding: vec![], + content: ClientMessageContentV0::InboxReceive{msg, from_queue: true}, + }, + ))) + .await; + } + } + } + + Ok(()) + } +} diff --git a/ng-net/src/actors/client/inbox_post.rs b/ng-net/src/actors/client/inbox_post.rs new file mode 100644 index 00000000..1a07fcfd --- /dev/null +++ b/ng-net/src/actors/client/inbox_post.rs @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + +use std::sync::Arc; + +use async_std::sync::Mutex; + +use ng_repo::errors::*; +use ng_repo::log::*; +use ng_repo::types::OverlayId; + +use crate::broker::BROKER; +use crate::connection::NoiseFSM; +use crate::types::*; +use crate::{actor::*, types::ProtocolMessage}; + +impl InboxPost { + pub fn get_actor(&self, id: i64) -> Box { + Actor::::new_responder(id) + } +} + +impl TryFrom for InboxPost { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + let req: ClientRequestContentV0 = msg.try_into()?; + if let ClientRequestContentV0::InboxPost(a) = req { + Ok(a) + } else { + log_debug!("INVALID {:?}", req); + Err(ProtocolError::InvalidValue) + } + } +} + +impl From for ProtocolMessage { + fn from(msg: InboxPost) -> ProtocolMessage { + ProtocolMessage::from_client_request_v0( + ClientRequestContentV0::InboxPost(msg), + OverlayId::nil(), + ) + } +} + +impl Actor<'_, InboxPost, ()> {} + +#[async_trait::async_trait] +impl EActor for Actor<'_, InboxPost, ()> { + async fn respond( + &mut self, + msg: ProtocolMessage, + fsm: Arc>, + ) -> Result<(), ProtocolError> { + let req = InboxPost::try_from(msg)?; + let sb = { BROKER.read().await.get_server_broker()? }; + let res: Result<(), ServerError> = sb + .read() + .await.inbox_post(req).await; + + fsm.lock() + .await + .send_in_reply_to(res.into(), self.id()) + .await?; + Ok(()) + } +} diff --git a/ng-net/src/actors/client/inbox_register.rs b/ng-net/src/actors/client/inbox_register.rs new file mode 100644 index 00000000..7283abaa --- /dev/null +++ b/ng-net/src/actors/client/inbox_register.rs @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers + * All rights reserved. + * Licensed under the Apache License, Version 2.0 + * + * or the MIT license , + * at your option. All files in the project carrying such + * notice may not be copied, modified, or distributed except + * according to those terms. +*/ + +use std::sync::Arc; + +use async_std::sync::Mutex; + +use ng_repo::errors::*; +use ng_repo::log::*; +use ng_repo::types::OverlayId; +use ng_repo::utils::verify; + +use crate::broker::BROKER; +use crate::connection::NoiseFSM; +use crate::types::*; +use crate::{actor::*, types::ProtocolMessage}; + +impl InboxRegister { + pub fn get_actor(&self, id: i64) -> Box { + Actor::::new_responder(id) + } +} + +impl TryFrom for InboxRegister { + type Error = ProtocolError; + fn try_from(msg: ProtocolMessage) -> Result { + let req: ClientRequestContentV0 = msg.try_into()?; + if let ClientRequestContentV0::InboxRegister(a) = req { + Ok(a) + } else { + log_debug!("INVALID {:?}", req); + Err(ProtocolError::InvalidValue) + } + } +} + +impl From for ProtocolMessage { + fn from(msg: InboxRegister) -> ProtocolMessage { + ProtocolMessage::from_client_request_v0( + ClientRequestContentV0::InboxRegister(msg), + OverlayId::nil(), + ) + } +} + +impl Actor<'_, InboxRegister, ()> {} + +#[async_trait::async_trait] +impl EActor for Actor<'_, InboxRegister, ()> { + async fn respond( + &mut self, + msg: ProtocolMessage, + fsm: Arc>, + ) -> Result<(), ProtocolError> { + let req = InboxRegister::try_from(msg)?; + + // verify registration + if verify(&req.challenge, req.sig, req.inbox_id).is_err() { + fsm.lock() + .await + .send_in_reply_to(Result::<(), _>::Err(ServerError::InvalidSignature).into(), self.id()) + .await?; + return Ok(()) + } + + let sb = { BROKER.read().await.get_server_broker()? }; + + let user_id = { + let fsm = fsm.lock().await; + fsm.user_id()? + }; + + let res: Result<(), ServerError> = sb + .read() + .await.inbox_register(user_id, req); + + fsm.lock() + .await + .send_in_reply_to(res.into(), self.id()) + .await?; + Ok(()) + } +} diff --git a/ng-net/src/actors/client/mod.rs b/ng-net/src/actors/client/mod.rs index 4d535392..05f341dd 100644 --- a/ng-net/src/actors/client/mod.rs +++ b/ng-net/src/actors/client/mod.rs @@ -17,3 +17,9 @@ pub mod blocks_exist; pub mod blocks_get; pub mod wallet_put_export; + +pub mod inbox_post; + +pub mod inbox_register; + +pub mod client_event; \ No newline at end of file diff --git a/ng-net/src/app_protocol.rs b/ng-net/src/app_protocol.rs index 1c9f4af8..d93b33f4 100644 --- a/ng-net/src/app_protocol.rs +++ b/ng-net/src/app_protocol.rs @@ -9,8 +9,6 @@ //! App Protocol (between LocalBroker and Verifier) -use lazy_static::lazy_static; -use regex::Regex; use serde::{Deserialize, Serialize}; use ng_repo::errors::NgError; @@ -23,31 +21,6 @@ use ng_repo::utils::{decode_overlayid, display_timestamp_local}; use crate::types::*; -lazy_static! { - #[doc(hidden)] - static ref RE_FILE_READ_CAP: Regex = - Regex::new(r"^did:ng:j:([A-Za-z0-9-_]*):k:([A-Za-z0-9-_]*)$").unwrap(); - #[doc(hidden)] - static ref RE_REPO_O: Regex = - Regex::new(r"^did:ng:o:([A-Za-z0-9-_]*)$").unwrap(); - #[doc(hidden)] - static ref RE_REPO: Regex = - Regex::new(r"^did:ng:o:([A-Za-z0-9-_]*):v:([A-Za-z0-9-_]*)$").unwrap(); - #[doc(hidden)] - static ref RE_BRANCH: Regex = - Regex::new(r"^did:ng:o:([A-Za-z0-9-_]*):v:([A-Za-z0-9-_]*):b:([A-Za-z0-9-_]*)$").unwrap(); - #[doc(hidden)] - static ref RE_NAMED_BRANCH_OR_COMMIT: Regex = - Regex::new(r"^did:ng:o:([A-Za-z0-9-_]*):v:([A-Za-z0-9-_]*):a:([A-Za-z0-9-_%]*)$").unwrap(); //TODO: allow international chars. disallow digit as first char - #[doc(hidden)] - static ref RE_OBJECTS: Regex = - Regex::new(r"^did:ng(?::o:([A-Za-z0-9-_]{44}))?:v:([A-Za-z0-9-_]{44})((?::[cj]:[A-Za-z0-9-_]{44}:k:[A-Za-z0-9-_]{44})+)(?::s:([A-Za-z0-9-_]{44}):k:([A-Za-z0-9-_]{44}))?:l:([A-Za-z0-9-_]*)$").unwrap(); - #[doc(hidden)] - static ref RE_OBJECT_READ_CAPS: Regex = - Regex::new(r":[cj]:([A-Za-z0-9-_]{44}):k:([A-Za-z0-9-_]{44})").unwrap(); - -} - #[derive(Clone, Debug, Serialize, Deserialize)] pub enum AppFetchContentV0 { Get, // does not subscribe. @@ -81,7 +54,8 @@ pub enum NgAccessV0 { #[serde(with = "serde_bytes")] ExtRequest(Vec), Key(BlockKey), - Inbox(PubKey), + Inbox(PrivKey), + Topic(PrivKey), } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -122,8 +96,12 @@ impl TargetBranchV0 { pub enum NuriTargetV0 { UserSite, // targets the whole data set of the user - PublicStore, - ProtectedStore, + PublicProfile, + PublicStore(RepoId), + ProtectedProfile, + ProtectedStore(RepoId), + GroupStore(RepoId), + DialogStore(RepoId), PrivateStore, AllDialogs, Dialog(String), // shortname of a Dialog @@ -131,6 +109,7 @@ pub enum NuriTargetV0 { Group(String), // shortname of a Group Repo(RepoId), + Inbox(PubKey), None, } @@ -238,6 +217,18 @@ impl NuriV0 { format!("{DID_PREFIX}:c:{commit_base64}:v:{overlay_id}") } + pub fn get_first_commit_ref(&self) -> Result { + let commit_id = match &self.branch { + Some(TargetBranchV0::Commits(commits)) => commits.get(0).ok_or(NgError::CommitNotFound)?, + _ => return Err(NgError::InvalidNuri) + }; + let commit_key = match self.access.get(0) { + Some(NgAccessV0::Key(key)) => key, + _ => return Err(NgError::InvalidNuri) + }; + Ok(ObjectRef::from_id_key(*commit_id, commit_key.clone())) + } + pub fn from_store_repo(store_repo: &StoreRepo) -> Self { NuriV0 { identity: None, @@ -326,6 +317,42 @@ impl NuriV0 { format!("{DID_PREFIX}:o:{repo_id}:c:{commit_id}") } + pub fn inbox(inbox_id: &PubKey) -> String { + format!("{DID_PREFIX}:d:{inbox_id}") + } + + pub fn from_store_repo_string(store_repo: &StoreRepo) -> String { + match store_repo { + StoreRepo::V0(v0) => match v0 { + StoreRepoV0::PublicStore(id) => NuriV0::public_profile(id), + StoreRepoV0::ProtectedStore(id) => NuriV0::protected_profile(id), + StoreRepoV0::PrivateStore(id) => NuriV0::private_store(id), + StoreRepoV0::Group(id) => NuriV0::group_store(id), + StoreRepoV0::Dialog((id,_)) => NuriV0::dialog_store(id), + }, + } + } + + pub fn public_profile(store_id: &PubKey) -> String { + format!("{DID_PREFIX}:a:{store_id}") + } + + pub fn protected_profile(store_id: &PubKey) -> String { + format!("{DID_PREFIX}:b:{store_id}") + } + + pub fn private_store(store_id: &PubKey) -> String { + format!("{DID_PREFIX}:c:{store_id}") + } + + pub fn group_store(store_id: &PubKey) -> String { + format!("{DID_PREFIX}:g:{store_id}") + } + + pub fn dialog_store(store_id: &PubKey) -> String { + format!("{DID_PREFIX}:d:{store_id}") + } + pub fn locator(locator: &Locator) -> String { format!("l:{locator}") } @@ -380,6 +407,11 @@ impl NuriV0 { locator: None, }) } + pub fn new_repo_target_from_id(repo_id: &RepoId) -> Self { + let mut n = Self::new_empty(); + n.target = NuriTargetV0::Repo(*repo_id); + n + } pub fn new_from_obj_ref(obj_ref: &ObjectRef) -> Self { Self { @@ -397,32 +429,27 @@ impl NuriV0 { } pub fn new_private_store_target() -> Self { - Self { - identity: None, - target: NuriTargetV0::PrivateStore, - entire_store: false, - objects: vec![], - signature: None, - branch: None, - overlay: None, - access: vec![], - topic: None, - locator: None, - } + let mut n = Self::new_empty(); + n.target = NuriTargetV0::PrivateStore; + n + } + + pub fn new_protected_store_target() -> Self { + let mut n = Self::new_empty(); + n.target = NuriTargetV0::ProtectedProfile; + n + } + + pub fn new_public_store_target() -> Self { + let mut n = Self::new_empty(); + n.target = NuriTargetV0::PublicProfile; + n } + pub fn new_entire_user_site() -> Self { - Self { - identity: None, - target: NuriTargetV0::UserSite, - entire_store: false, - objects: vec![], - signature: None, - branch: None, - overlay: None, - access: vec![], - topic: None, - locator: None, - } + let mut n = Self::new_empty(); + n.target = NuriTargetV0::UserSite; + n } pub fn new_for_readcaps(from: &str) -> Result { let c = RE_OBJECTS.captures(from); @@ -480,6 +507,98 @@ impl NuriV0 { } } + pub fn from_inbox_into_id(from: &String) -> Result { + let c = RE_INBOX.captures(&from); + if c.is_some() + && c.as_ref().unwrap().get(1).is_some() + { + let cap = c.unwrap(); + let d = cap.get(1).unwrap().as_str(); + let to_inbox = decode_key(d)?; + return Ok(to_inbox); + } + Err(NgError::InvalidNuri) + } + + pub fn from_profile_into_overlay_id(from: &String) -> Result { + let c = RE_PROFILE.captures(&from); + if c.is_some() + && c.as_ref().unwrap().get(1).is_some() + { + let cap = c.unwrap(); + let o = cap.get(1).unwrap().as_str(); + let to_profile_id = decode_key(o)?; + let to_overlay = OverlayId::outer(&to_profile_id); + return Ok(to_overlay); + } + Err(NgError::InvalidNuri) + } + + pub fn new_from_repo_graph(from: &String) -> Result { + let c = RE_REPO.captures(from); + + if c.is_some() + && c.as_ref().unwrap().get(1).is_some() + && c.as_ref().unwrap().get(2).is_some() + { + let cap = c.unwrap(); + let o = cap.get(1).unwrap().as_str(); + let v = cap.get(2).unwrap().as_str(); + let repo_id = decode_key(o)?; + let overlay_id = decode_overlayid(v)?; + + let mut n = Self::new_empty(); + n.target = NuriTargetV0::Repo(repo_id); + n.overlay = Some(overlay_id.into()); + return Ok(n); + } + Err(NgError::InvalidNuri) + } + + pub fn new_from_commit(from: &String) -> Result { + + let c = RE_COMMIT.captures(&from); + if c.is_some() + && c.as_ref().unwrap().get(1).is_some() + && c.as_ref().unwrap().get(2).is_some() + && c.as_ref().unwrap().get(3).is_some() + { + let cap = c.unwrap(); + let o = cap.get(1).unwrap().as_str(); + let c = cap.get(2).unwrap().as_str(); + let k = cap.get(3).unwrap().as_str(); + let repo_id = decode_key(o)?; + let commit_id = decode_digest(c)?; + let commit_key = decode_sym_key(k)?; + return Ok(Self { + identity: None, + target: NuriTargetV0::Repo(repo_id), + entire_store: false, + objects: vec![], + signature: None, + branch: Some(TargetBranchV0::Commits(vec![commit_id])), + overlay: None, + access: vec![NgAccessV0::Key(commit_key)], + topic: None, + locator: None, + }); + } + Err(NgError::InvalidNuri) + } + + pub fn from_repo_nuri_to_id(from: &String) -> Result { + let c = RE_REPO_O.captures(from); + + if c.is_some() && c.as_ref().unwrap().get(1).is_some() { + let cap = c.unwrap(); + let o = cap.get(1).unwrap().as_str(); + + let repo_id = decode_key(o)?; + return Ok(repo_id); + } + Err(NgError::InvalidNuri) + } + pub fn new_from(from: &String) -> Result { let c = RE_REPO_O.captures(from); @@ -524,30 +643,9 @@ impl NuriV0 { locator: None, }) } else { - let c = RE_REPO.captures(from); - - if c.is_some() - && c.as_ref().unwrap().get(1).is_some() - && c.as_ref().unwrap().get(2).is_some() - { - let cap = c.unwrap(); - let o = cap.get(1).unwrap().as_str(); - - let v = cap.get(2).unwrap().as_str(); - let repo_id = decode_key(o)?; - let overlay_id = decode_overlayid(v)?; - Ok(Self { - identity: None, - target: NuriTargetV0::Repo(repo_id), - entire_store: false, - objects: vec![], - signature: None, - branch: None, - overlay: Some(overlay_id.into()), - access: vec![], - topic: None, - locator: None, - }) + + if let Ok(n) = NuriV0::new_from_repo_graph(from) { + Ok(n) } else { let c = RE_BRANCH.captures(from); @@ -594,19 +692,16 @@ pub enum AppRequestCommandV0 { FileGet, // needs the Nuri of branch/doc/store AND ObjectId FilePut, // needs the Nuri of branch/doc/store Header, + InboxPost, + SocialQueryStart, + SocialQueryCancel, } impl AppRequestCommandV0 { pub fn is_stream(&self) -> bool { match self { Self::Fetch(AppFetchContentV0::Subscribe) | Self::FileGet => true, - Self::FilePut - | Self::Create - | Self::Delete - | Self::UnPin - | Self::Pin - | Self::Header - | Self::Fetch(_) => false, + _ => false, } } pub fn new_read_query() -> Self { @@ -689,6 +784,41 @@ impl AppRequest { }) } + pub fn inbox_post(post: InboxPost) -> Self { + AppRequest::new( + AppRequestCommandV0::InboxPost, + NuriV0::new_empty(), + Some(AppRequestPayload::V0(AppRequestPayloadV0::InboxPost(post))), + ) + } + + pub fn social_query_start( + from_profile: NuriV0, + query: NuriV0, + contacts: String, + degree: u16, + ) -> Self { + AppRequest::new( + AppRequestCommandV0::SocialQueryStart, + query, + Some(AppRequestPayload::V0(AppRequestPayloadV0::SocialQueryStart{ + from_profile, + contacts, + degree + })), + ) + } + + pub fn social_query_cancel( + query: NuriV0, + ) -> Self { + AppRequest::new( + AppRequestCommandV0::SocialQueryCancel, + query, + None + ) + } + pub fn doc_fetch_repo_subscribe(repo_o: String) -> Result { Ok(AppRequest::new( AppRequestCommandV0::Fetch(AppFetchContentV0::get_or_subscribe(true)), @@ -876,6 +1006,13 @@ pub enum AppRequestPayloadV0 { RandomAccessFilePutChunk((u32, serde_bytes::ByteBuf)), // end the upload with an empty vec Header(DocHeader), + + InboxPost(InboxPost), + SocialQueryStart { + from_profile: NuriV0, + contacts: String, + degree: u16, + }, //RemoveFile //Invoke(InvokeArguments), } diff --git a/ng-net/src/broker.rs b/ng-net/src/broker.rs index 31e5ed27..2c52e12e 100644 --- a/ng-net/src/broker.rs +++ b/ng-net/src/broker.rs @@ -124,6 +124,11 @@ pub enum LocalBrokerMessage { Disconnected { user_id: UserId, }, + Inbox{ + user_id: UserId, + msg: InboxMsg, + from_queue: bool, + }, } pub static BROKER: Lazy>> = Lazy::new(|| Arc::new(RwLock::new(Broker::new()))); @@ -1092,6 +1097,25 @@ impl Broker { Ok(()) } + pub async fn send_client_event< + A: Into + std::fmt::Debug + Sync + Send + 'static, + >( + &self, + user: &Option, + remote_peer_id: &Option, // None means local + msg: A, + ) -> Result<(), NgError> { + let bpi = self + .peers + .get(&(*user, remote_peer_id.map(|rpi| rpi.to_dh_slice()))) + .ok_or(NgError::ConnectionNotFound)?; + match &bpi.connected { + PeerConnection::Client(cnx) => cnx.send_client_event(msg).await, + PeerConnection::Local(lt) => lt.client_cnx.send_client_event(msg).await, + _ => Err(NgError::BrokerError), + } + } + pub async fn request< A: Into + std::fmt::Debug + Sync + Send + 'static, B: TryFrom + std::fmt::Debug + Sync + Send + 'static, @@ -1188,6 +1212,43 @@ impl Broker { Ok(clients_to_remove) } + #[cfg(not(target_arch = "wasm32"))] + pub async fn dispatch_inbox_msg( + &self, + users: &HashSet, + msg: InboxMsg, + ) -> Result, ServerError> { + + for user in users.iter() { + if let Some(peers) = self.users_peers.get(user) { + for peer in peers.iter() { + if peer.is_some() { + if let Some(BrokerPeerInfo { + connected: PeerConnection::Client(ConnectionBase { fsm: Some(fsm), .. }), + .. + }) = self.peers.get(&(None, Some(peer.to_owned().unwrap()))) + { + //let fsm = Arc::clone(fsm); + let _ = fsm + .lock() + .await + .send(ProtocolMessage::ClientMessage(ClientMessage::V0( + ClientMessageV0 { + overlay: msg.body.to_overlay.clone(), + padding: vec![], + content: ClientMessageContentV0::InboxReceive{msg, from_queue: false}, + }, + ))) + .await; + return Ok(None); + } + } + } + } + } + Ok(Some(msg)) + } + #[doc(hidden)] pub async fn close_peer_connection_x( &mut self, diff --git a/ng-net/src/connection.rs b/ng-net/src/connection.rs index 09ab768d..b4f3e0ce 100644 --- a/ng-net/src/connection.rs +++ b/ng-net/src/connection.rs @@ -1031,18 +1031,37 @@ impl NoiseFSM { } None => { if let ProtocolMessage::ClientMessage(cm) = msg { - if let Some((event, overlay)) = cm.forwarded_event() { - let _ = BROKER - .read() - .await - .get_local_broker()? - .send(LocalBrokerMessage::Deliver { - event, - overlay, - user: self.user_id()?, - }) - .await; - return Ok(StepReply::NONE); + let overlay = cm.overlay_id(); + match cm { + ClientMessage::V0(o) => match o.content { + ClientMessageContentV0::ForwardedEvent(event) => { + let _ = BROKER + .read() + .await + .get_local_broker()? + .send(LocalBrokerMessage::Deliver { + event, + overlay, + user: self.user_id()?, + }) + .await; + return Ok(StepReply::NONE); + }, + ClientMessageContentV0::InboxReceive{msg, from_queue} => { + let _ = BROKER + .read() + .await + .get_local_broker()? + .send(LocalBrokerMessage::Inbox { + msg, + user_id: self.user_id()?, + from_queue + }) + .await; + return Ok(StepReply::NONE); + } + _ => {}, + }, } } } @@ -1050,6 +1069,7 @@ impl NoiseFSM { } } } + log_err!("reached end of FSM"); Err(ProtocolError::InvalidState) } } @@ -1304,6 +1324,14 @@ impl ConnectionBase { Ok(()) } + pub async fn send_client_event< + A: Into + std::fmt::Debug + Sync + Send + 'static, + >(&self, msg: A) -> Result<(), NgError> { + let proto_msg: ProtocolMessage = msg.into(); + self.fsm.as_ref().unwrap().lock().await.send(proto_msg).await?; + Ok(()) + } + pub async fn request< A: Into + std::fmt::Debug + Sync + Send + 'static, B: TryFrom + std::fmt::Debug + Sync + Send + 'static, diff --git a/ng-net/src/server_broker.rs b/ng-net/src/server_broker.rs index cac8ea34..74eee21d 100644 --- a/ng-net/src/server_broker.rs +++ b/ng-net/src/server_broker.rs @@ -40,6 +40,9 @@ pub trait IServerBroker: Send + Sync { &self, rendezvous: SymKey, ) -> Receiver>; + async fn inbox_post(&self, post: InboxPost) -> Result<(), ServerError>; + fn inbox_register(&self, user_id: UserId, registration: InboxRegister) -> Result<(), ServerError>; + async fn inbox_pop_for_user(&self, user: UserId ) -> Result; fn get_path_users(&self) -> PathBuf; fn get_block_storage(&self) -> Arc>; fn put_block(&self, overlay_id: &OverlayId, block: Block) -> Result<(), ServerError>; diff --git a/ng-net/src/types.rs b/ng-net/src/types.rs index 0a32d874..d32b1c78 100644 --- a/ng-net/src/types.rs +++ b/ng-net/src/types.rs @@ -13,6 +13,7 @@ use core::fmt; use std::collections::HashSet; +use std::sync::Arc; use std::{ any::{Any, TypeId}, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}, @@ -25,16 +26,58 @@ use ng_repo::errors::*; use ng_repo::log::*; use ng_repo::store::Store; use ng_repo::types::*; -use ng_repo::utils::{sign, verify}; +use ng_repo::utils::{random_key, sign, verify, decode_digest, decode_key, decode_sym_key, decode_priv_key, decode_overlayid}; use crate::app_protocol::*; use crate::utils::{ get_domain_without_port_443, is_ipv4_private, is_ipv6_private, is_private_ip, is_public_ip, - is_public_ipv4, is_public_ipv6, + is_public_ipv4, is_public_ipv6, decode_locator }; use crate::WS_PORT_ALTERNATE; use crate::{actor::EActor, actors::admin::*, actors::*}; +use lazy_static::lazy_static; +use regex::Regex; + +lazy_static! { + #[doc(hidden)] + pub(crate) static ref RE_FILE_READ_CAP: Regex = + Regex::new(r"^did:ng:j:([A-Za-z0-9-_]{44}):k:([A-Za-z0-9-_]{44})$").unwrap(); + #[doc(hidden)] + pub(crate) static ref RE_REPO_O: Regex = + Regex::new(r"^did:ng:o:([A-Za-z0-9-_]{44})$").unwrap(); + #[doc(hidden)] + pub(crate) static ref RE_REPO: Regex = + Regex::new(r"^did:ng:o:([A-Za-z0-9-_]{44}):v:([A-Za-z0-9-_]{44})$").unwrap(); + #[doc(hidden)] + pub(crate) static ref RE_BRANCH: Regex = + Regex::new(r"^did:ng:o:([A-Za-z0-9-_]{44}):v:([A-Za-z0-9-_]{44}):b:([A-Za-z0-9-_]{44})$").unwrap(); + #[doc(hidden)] + pub(crate) static ref RE_NAMED_BRANCH_OR_COMMIT: Regex = + Regex::new(r"^did:ng:o:([A-Za-z0-9-_]{44}):v:([A-Za-z0-9-_]{44}):a:([A-Za-z0-9-_%]*)$").unwrap(); //TODO: allow international chars. disallow digit as first char + #[doc(hidden)] + pub(crate) static ref RE_OBJECTS: Regex = + Regex::new(r"^did:ng(?::o:([A-Za-z0-9-_]{44}))?:v:([A-Za-z0-9-_]{44})((?::[cj]:[A-Za-z0-9-_]{44}:k:[A-Za-z0-9-_]{44})+)(?::s:([A-Za-z0-9-_]{44}):k:([A-Za-z0-9-_]{44}))?:l:([A-Za-z0-9-_]*)$").unwrap(); + #[doc(hidden)] + pub(crate) static ref RE_OBJECT_READ_CAPS: Regex = + Regex::new(r":[cj]:([A-Za-z0-9-_]{44}):k:([A-Za-z0-9-_]{44})").unwrap(); + #[doc(hidden)] + pub(crate) static ref RE_FROM_PUBLIC_PROFILE_INBOX: Regex = + Regex::new(r"^did:ng:a:([A-Za-z0-9-_]{44}):p:([A-Za-z0-9-_]{44})$").unwrap(); + #[doc(hidden)] + pub(crate) static ref RE_COMMIT: Regex = + Regex::new(r"^did:ng:o:([A-Za-z0-9-_]{44}):c:([A-Za-z0-9-_]{44}):k:([A-Za-z0-9-_]{44})$").unwrap(); + #[doc(hidden)] + pub(crate) static ref RE_INBOX_OVERLAY: Regex = + Regex::new(r"^did:ng:d:([A-Za-z0-9-_]{44}):v:([A-Za-z0-9-_]{44})(:l:([A-Za-z0-9-_]*))?$").unwrap(); + #[doc(hidden)] + pub(crate) static ref RE_INBOX: Regex = + Regex::new(r"^did:ng:d:([A-Za-z0-9-_]{44})$").unwrap(); + #[doc(hidden)] + pub(crate) static ref RE_PROFILE: Regex = + Regex::new(r"^did:ng:[ab]:([A-Za-z0-9-_]{44})$").unwrap(); +} + #[derive(Debug, Clone, Serialize, Deserialize)] /// used to initiate a session at a local broker V0 pub struct Credentials { @@ -2136,8 +2179,8 @@ pub enum InnerOverlayMessageContentV0 { SubMarker(SubMarker), UnsubReq(UnsubReq), Event(Event), - //PostInboxRequest(PostInboxRequest), - //PostInboxResponse(PostInboxResponse), + //InboxPostRequest(InboxPostRequest), + //InboxPostResponse(InboxPostResponse), } /// Inner Overlay message payload V0 @@ -2371,7 +2414,7 @@ pub enum CoreDirectMessageContentV0 { ReturnPathTimingAdvert(ReturnPathTimingAdvert), BlocksGet(CoreBlocksGet), BlockResult(CoreBlockResult), - //PostInbox, + //InboxPost, //PartialSignature, //ClientDirectMessage //for messages between forwarded or direct peers } @@ -2451,7 +2494,7 @@ pub enum OuterOverlayResponseContentV0 { EmptyResponse(()), Block(Block), TopicSyncRes(TopicSyncRes), - //PostInboxResponse(PostInboxResponse), + //InboxPostResponse(InboxPostResponse), } /// Content of OuterOverlayRequest V0 @@ -2462,7 +2505,7 @@ pub enum OuterOverlayRequestContentV0 { TopicSub(PubKey), TopicUnsub(PubKey), BlocksGet(BlocksGet), - //PostInboxRequest(PostInboxRequest), + //InboxPostRequest(InboxPostRequest), } /// OuterOverlayRequestV0 V0 @@ -3588,6 +3631,139 @@ impl ObjectDel { } } +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct InboxRegister { + pub inbox_id: PubKey, + pub overlay: OverlayId, + // TODO: obtain challenge from Broker + pub challenge: [u8; 32], + // signature of challenge by inbox privkey + pub sig: Sig +} + +impl InboxRegister { + pub fn new(inbox: PrivKey, overlay: OverlayId) -> Result { + let challenge = random_key(); + let inbox_id = inbox.to_pub(); + let sig = sign(&inbox,&inbox_id, &challenge)?; + Ok(Self { + inbox_id, + overlay, + challenge, + sig + }) + } +} + + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct InboxPost { + pub msg: InboxMsg, + /// optional Locator for destination inbox, in case broker doesn't know where to find inbox + pub to_broker: Option, +} + +impl InboxPost { + pub fn new( + to_overlay: OverlayId, + to_inbox: PubKey, + from: Option<(OverlayId,PrivKey)>, + content:&InboxMsgContent, + blocks: Vec, + to_broker: Option + ) -> Result + { + Ok(Self { + msg: InboxMsg::new(to_overlay,to_inbox,from,content,blocks)?, + to_broker + }) + } + + pub fn new_social_query_response( + to_overlay: OverlayId, + to_inbox: PubKey, + from: Option<(OverlayId,PrivKey)>, + query_id: RepoId, + forwarder_id: RepoId, + content: SocialQueryResponseContent + ) -> Result { + let content = InboxMsgContent::SocialQuery(SocialQuery::Response(SocialQueryResponse { query_id, forwarder_id, content })); + Self::new(to_overlay, to_inbox, from, &content, vec![], None) + } + + pub fn new_social_query_response_replying_to( + msg: &InboxMsgBody, + request: &SocialQueryRequest, + content: SocialQueryResponseContent, + inbox_privkey: PrivKey, + ) -> Result { + let to_overlay = msg.from_overlay.ok_or(NgError::InvalidArgument)?; + let to_inbox = msg.from_inbox.ok_or(NgError::InvalidArgument)?; + if msg.to_inbox != inbox_privkey.to_pub() { return Err(NgError::InvalidArgument); } + let from = Some((msg.to_overlay, inbox_privkey)); + let query_id = request.query_id; + let forwarder_id = request.forwarder_id; + let content = InboxMsgContent::SocialQuery(SocialQuery::Response(SocialQueryResponse { query_id, forwarder_id, content })); + Self::new(to_overlay, to_inbox, from, &content, vec![], None) + } + + /// to_profile_nuri = did:ng:[ab] + /// to_inbox_nuri = did:ng:d + pub fn new_social_query_request( + from_profile_store_repo: StoreRepo, + from_inbox: PrivKey, + forwarder_id: RepoId, + to_profile_nuri: String, + to_inbox_nuri: String, + to_broker: Option, + query_id: RepoId, + definition_commit_body_ref: ObjectRef, + blocks: Vec, + degree: u16, + ) -> Result { + + // processing to_profile_nuri + let c = RE_PROFILE.captures(&to_profile_nuri); + if c.is_some() + && c.as_ref().unwrap().get(1).is_some() + { + let cap = c.unwrap(); + let o = cap.get(1).unwrap().as_str(); + let to_profile_id = decode_key(o)?; + let to_overlay = OverlayId::outer(&to_profile_id); + + // processing to_inbox_nuri + let c = RE_INBOX.captures(&to_inbox_nuri); + if c.is_some() + && c.as_ref().unwrap().get(1).is_some() + { + let cap = c.unwrap(); + let d = cap.get(1).unwrap().as_str(); + let to_inbox = decode_key(d)?; + let from_overlay = from_profile_store_repo.outer_overlay(); + let content = InboxMsgContent::SocialQuery(SocialQuery::Request(SocialQueryRequest{ + query_id, + forwarder_id, + from_profile_store_repo, + degree, + definition_commit_body_ref, + })); + + return Ok(InboxPost::new( + to_overlay, + to_inbox, + Some((from_overlay,from_inbox)), + &content, + blocks, + to_broker + )?); + } + } + Err(NgError::InvalidNuri) + } + +} + /// Request to publish an event in pubsub #[derive(Clone, Debug, Serialize, Deserialize)] pub struct PublishEvent(pub Event, #[serde(skip)] pub Option); @@ -3619,6 +3795,9 @@ pub enum ClientRequestContentV0 { PublishEvent(PublishEvent), WalletPutExport(WalletPutExport), + + InboxRegister(InboxRegister), + InboxPost(InboxPost), } impl ClientRequestContentV0 { @@ -3627,6 +3806,8 @@ impl ClientRequestContentV0 { ClientRequestContentV0::RepoPinStatusReq(a) => a.set_overlay(overlay), ClientRequestContentV0::TopicSub(a) => a.set_overlay(overlay), ClientRequestContentV0::PinRepo(_a) => {} + ClientRequestContentV0::InboxRegister(_a) => {} + ClientRequestContentV0::InboxPost(_a) => {} ClientRequestContentV0::PublishEvent(a) => a.set_overlay(overlay), ClientRequestContentV0::CommitGet(a) => a.set_overlay(overlay), ClientRequestContentV0::TopicSyncReq(a) => a.set_overlay(overlay), @@ -3686,6 +3867,8 @@ impl ClientRequest { ClientRequestContentV0::BlocksExist(r) => r.get_actor(self.id()), ClientRequestContentV0::BlocksGet(r) => r.get_actor(self.id()), ClientRequestContentV0::WalletPutExport(r) => r.get_actor(self.id()), + ClientRequestContentV0::InboxRegister(r) => r.get_actor(self.id()), + ClientRequestContentV0::InboxPost(r) => r.get_actor(self.id()), _ => unimplemented!(), }, } @@ -3956,6 +4139,156 @@ impl TryFrom for ClientResponseContentV0 { } } +/// Starts a new Social Query +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SocialQueryRequest { + /// Query ID + pub query_id: RepoId, + + /// Forwarder ID + pub forwarder_id: RepoId, + + /// Profile ID (must match the from_overlay) + pub from_profile_store_repo: StoreRepo, + + /// degree of forwarding in the social network + /// gets decremented at every hop + /// 0 means unlimited + /// 1 means stop here (after processing locally, do not forward) + pub degree: u16, + + /// Definition in RDF. the blocks are added in InboxMsg.blocks + pub definition_commit_body_ref: ObjectRef, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum SocialQueryResponseContent { + True, + False, + Graph(Vec), + QueryResult(Vec), + EndOfReplies, + AlreadyRequested, + Error(u16), +} + +/// Response to a `SocialQueryRequest` +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct SocialQueryResponse { + /// Query ID + pub query_id: RepoId, + + /// Forwarder ID + pub forwarder_id: RepoId, + + /// Response content + pub content: SocialQueryResponseContent, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum SocialQuery { + Request(SocialQueryRequest), + Response(SocialQueryResponse), + Cancel(RepoId), + Delete(RepoId), +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum InboxMsgContent { + ContactDetails, + DialogRequest, + Link, + Patch, + ServiceRequest, + ExtRequest, + RemoteQuery, + SocialQuery(SocialQuery), + //Transaction + //Comment + //BackLink +} + +/// InboxMsgBody +#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq)] +pub struct InboxMsgBody { + + pub to_overlay: OverlayId, + pub to_inbox: PubKey, + + pub from_overlay: Option, + pub from_inbox: Option, + + /// crypto_box_sealed of InboxMsgContent serialization, encrypted to the to_inbox pub key + #[serde(with = "serde_bytes")] + pub msg: Vec, +} + +/// InboxMsg +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct InboxMsg { + + pub body: InboxMsgBody, + + /// optional signature by sender (from_inbox pubkey), over body + pub sig: Option, + + /// optional blocks that should be sent with the message + pub blocks: Vec, +} + +impl InboxMsg { + pub fn new( + to_overlay: OverlayId, + to_inbox: PubKey, + from: Option<(OverlayId,PrivKey)>, + content:&InboxMsgContent, + blocks: Vec + ) -> Result { + let ser = serde_bare::to_vec(content).unwrap(); + let mut rng = crypto_box::aead::OsRng {}; + let msg = crypto_box::seal(&mut rng, &to_inbox.to_dh_slice().into(), &ser) + .map_err(|_| NgError::EncryptionError)?; + let body = InboxMsgBody { + to_overlay, + to_inbox, + from_overlay: from.as_ref().map(|(o,_)|o.clone()), + from_inbox: from.as_ref().map(|(_,i)|i.to_pub()), + msg + }; + let sig = match from { + Some((_,inbox)) => { + let ser = serde_bare::to_vec(&body).unwrap(); + Some(sign( + &inbox, + body.from_inbox.as_ref().unwrap(), + &ser, + )?)}, + None=>None + }; + Ok( + Self { + body, + sig, + blocks + } + ) + } + + pub fn get_content(&self, inbox_sk: &PrivKey) -> Result { + let ser = crypto_box::seal_open(&(*inbox_sk.to_dh().slice()).into(), &self.body.msg) + .map_err(|_| NgError::DecryptionError)?; + let content: InboxMsgContent = + serde_bare::from_slice(&ser).map_err(|_| NgError::SerializationError)?; + Ok(content) + } +} + +/// Content of `ClientEvent` +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ClientEvent { + InboxPopRequest, +} + /// Content of `ClientMessageV0` #[derive(Clone, Debug, Serialize, Deserialize)] pub enum ClientMessageContentV0 { @@ -3963,6 +4296,8 @@ pub enum ClientMessageContentV0 { ClientResponse(ClientResponse), ForwardedEvent(Event), ForwardedBlock(Block), + InboxReceive{ msg: InboxMsg, from_queue: bool }, + ClientEvent(ClientEvent), } impl ClientMessageContentV0 { pub fn is_block(&self) -> bool { @@ -4006,9 +4341,7 @@ impl IStreamable for ClientMessage { match self { ClientMessage::V0(o) => match &o.content { ClientMessageContentV0::ClientResponse(r) => r.result(), - ClientMessageContentV0::ClientRequest(_) - | ClientMessageContentV0::ForwardedEvent(_) - | ClientMessageContentV0::ForwardedBlock(_) => { + _ => { panic!("it is not a response"); } }, @@ -4018,9 +4351,7 @@ impl IStreamable for ClientMessage { match self { ClientMessage::V0(o) => match &mut o.content { ClientMessageContentV0::ClientResponse(r) => r.set_result(result), - ClientMessageContentV0::ClientRequest(_) - | ClientMessageContentV0::ForwardedEvent(_) - | ClientMessageContentV0::ForwardedBlock(_) => { + _ => { panic!("it is not a response"); } }, @@ -4043,15 +4374,6 @@ impl ClientMessage { } } - pub fn forwarded_event(self) -> Option<(Event, OverlayId)> { - let overlay = self.overlay_id(); - match self { - ClientMessage::V0(o) => match o.content { - ClientMessageContentV0::ForwardedEvent(e) => Some((e, overlay)), - _ => None, - }, - } - } pub fn overlay_id(&self) -> OverlayId { match self { ClientMessage::V0(o) => o.overlay, @@ -4076,8 +4398,8 @@ impl ClientMessage { ClientMessage::V0(o) => match &o.content { ClientMessageContentV0::ClientResponse(r) => Some(r.id()), ClientMessageContentV0::ClientRequest(r) => Some(r.id()), - ClientMessageContentV0::ForwardedEvent(_) - | ClientMessageContentV0::ForwardedBlock(_) => None, + ClientMessageContentV0::ClientEvent(r) => Some(1), + _ => None, }, } } @@ -4086,8 +4408,7 @@ impl ClientMessage { ClientMessage::V0(o) => match &mut o.content { ClientMessageContentV0::ClientResponse(ref mut r) => r.set_id(id), ClientMessageContentV0::ClientRequest(ref mut r) => r.set_id(id), - ClientMessageContentV0::ForwardedEvent(_) - | ClientMessageContentV0::ForwardedBlock(_) => { + _ => { panic!("it is an event") } }, @@ -4098,9 +4419,7 @@ impl ClientMessage { match self { ClientMessage::V0(o) => match &o.content { ClientMessageContentV0::ClientResponse(r) => r.block(), - ClientMessageContentV0::ClientRequest(_) - | ClientMessageContentV0::ForwardedEvent(_) - | ClientMessageContentV0::ForwardedBlock(_) => { + _ => { panic!("it is not a response"); } }, @@ -4111,9 +4430,8 @@ impl ClientMessage { match self { ClientMessage::V0(o) => match &o.content { ClientMessageContentV0::ClientRequest(req) => req.get_actor(), - ClientMessageContentV0::ClientResponse(_) - | ClientMessageContentV0::ForwardedEvent(_) - | ClientMessageContentV0::ForwardedBlock(_) => { + ClientMessageContentV0::ClientEvent(req) => req.get_actor(1), + _ => { panic!("it is not a request"); } }, @@ -4857,7 +5175,7 @@ pub enum NgLink { V0(NgLinkV0), } -// TODO: PermaLinks and PostInbox (and ExtRequests) +// TODO: PermaLinks and InboxPost (and ExtRequests) #[cfg(test)] mod test { diff --git a/ng-net/src/utils.rs b/ng-net/src/utils.rs index 57f63ac9..3e05dd75 100644 --- a/ng-net/src/utils.rs +++ b/ng-net/src/utils.rs @@ -80,6 +80,10 @@ lazy_static! { Regex::new(r"^\[([0-9a-fA-F:]{3,39})\](\:\d{1,5})?$").unwrap(); } +pub fn decode_locator(string: &str) -> Result { + unimplemented!(); +} + #[doc(hidden)] pub fn parse_ipv4_and_port_for( string: String, diff --git a/ng-repo/src/commit.rs b/ng-repo/src/commit.rs index c660bbd5..9aeea52e 100644 --- a/ng-repo/src/commit.rs +++ b/ng-repo/src/commit.rs @@ -130,7 +130,7 @@ impl CommitV0 { if self.id.is_some() && self.key.is_some() { return Ok(ObjectRef::from_id_key( self.id.unwrap(), - self.key.as_ref().unwrap().clone(), + self.key.to_owned().unwrap(), )); } // log_debug!("{:?}", self.header); @@ -308,6 +308,56 @@ impl Commit { } } + pub fn collect_block_ids( + commit_ref: ObjectRef, + store: &Store, + with_body: bool + ) -> Result, CommitLoadError> { + let mut block_ids : Vec; + let (id, key) = (commit_ref.id, commit_ref.key); + match Object::load(id, Some(key.clone()), store) { + Err(ObjectParseError::MissingHeaderBlocks((_, missing))) => { + return Err(CommitLoadError::MissingBlocks(missing)); + }, + Ok(obj) => { + let content = obj + .content() + .map_err(|e| CommitLoadError::ContentParseError(e))?; + let commit = match content { + ObjectContent::V0(ObjectContentV0::Commit(c)) => c, + _ => return Err(CommitLoadError::NotACommit), + }; + block_ids = obj.block_ids(); + + if with_body { + let content = commit.content_v0(); + let (id, key) = (content.body.id, content.body.key.clone()); + let obj = Object::load(id.clone(), Some(key.clone()), store).map_err(|e| match e { + ObjectParseError::MissingBlocks(missing) => CommitLoadError::MissingBlocks(missing), + _ => CommitLoadError::ObjectParseError, + })?; + let content = obj + .content() + .map_err(|_e| CommitLoadError::ObjectParseError)?; + match content { + ObjectContent::V0(ObjectContentV0::CommitBody(_)) => { + block_ids.append(&mut obj.block_ids()); + } + _ => return Err(CommitLoadError::NotACommitBody), + } + } + Ok(block_ids) + } + Err(ObjectParseError::MissingBlocks(missing)) => { + Err(CommitLoadError::MissingBlocks(missing)) + } + Err(_e) => { + log_err!("{:?}", _e); + Err(CommitLoadError::ObjectParseError) + } + } + } + /// Load commit from store pub fn load( commit_ref: ObjectRef, @@ -991,6 +1041,7 @@ impl CommitBody { CommitBodyV0::RemoveLink(_) => true, CommitBodyV0::AddSignerCap(_) => true, CommitBodyV0::RemoveSignerCap(_) => true, + CommitBodyV0::AddInboxCap(_) => true, CommitBodyV0::WalletUpdate(_) => true, CommitBodyV0::StoreUpdate(_) => true, _ => false, @@ -1128,6 +1179,7 @@ impl CommitBody { | CommitBodyV0::RemoveLink(_) | CommitBodyV0::AddSignerCap(_) | CommitBodyV0::RemoveSignerCap(_) + | CommitBodyV0::AddInboxCap(_) | CommitBodyV0::WalletUpdate(_) | CommitBodyV0::StoreUpdate(_) => vec![], }, @@ -1526,9 +1578,10 @@ impl fmt::Display for CommitBody { //CommitBodyV0::RemoveRepo(b) => write!(f, "RemoveRepo {}", b), CommitBodyV0::AddSignerCap(b) => write!(f, "AddSignerCap {}", b), CommitBodyV0::StoreUpdate(b) => write!(f, "StoreUpdate {}", b), + CommitBodyV0::AddInboxCap(b) => write!(f, "AddInboxCap {}", b), + /* AddLink(AddLink), RemoveLink(RemoveLink), - AddSignerCap(AddSignerCap), RemoveSignerCap(RemoveSignerCap), WalletUpdate(WalletUpdate), StoreUpdate(StoreUpdate), */ diff --git a/ng-repo/src/errors.rs b/ng-repo/src/errors.rs index 393f36fe..8f770ba0 100644 --- a/ng-repo/src/errors.rs +++ b/ng-repo/src/errors.rs @@ -62,6 +62,8 @@ pub enum NgError { StoreNotFound, UserNotFound, TopicNotFound, + InboxNotFound, + CommitNotFound, NotConnected, ActorError, ProtocolError(ProtocolError), @@ -91,6 +93,8 @@ pub enum NgError { InvalidClass, KeyShareNotFound, BrokerNotFound, + SparqlError(String), + ContactNotFound, } impl Error for NgError {} @@ -186,6 +190,7 @@ impl From for NgError { VerifierError::RepoNotFound => NgError::RepoNotFound, VerifierError::StoreNotFound => NgError::StoreNotFound, VerifierError::BranchNotFound => NgError::BranchNotFound, + VerifierError::SparqlError(s) => NgError::SparqlError(s), _ => NgError::VerifierError(e), } } @@ -381,6 +386,10 @@ pub enum VerifierError { InvalidNuri, InvalidJson, NothingToSign, + InvalidSocialQuery, + InvalidResponse, + SparqlError(String), + InboxError(String), } impl Error for VerifierError {} diff --git a/ng-repo/src/file.rs b/ng-repo/src/file.rs index 890dab5b..f7d3ff41 100644 --- a/ng-repo/src/file.rs +++ b/ng-repo/src/file.rs @@ -189,7 +189,7 @@ impl ReadFile for RandomAccessFile { recurse_tree( &self.store, - self.content_block.as_ref().unwrap().clone(), + self.content_block.to_owned().unwrap(), &mut res, self.meta.depth(), )?; @@ -230,7 +230,7 @@ impl ReadFile for RandomAccessFile { return Err(FileError::EndOfFile); } size = min(total - pos, size); - let mut current_block_id_key = self.content_block.as_ref().unwrap().clone(); + let mut current_block_id_key = self.content_block.to_owned().unwrap(); let depth = self.meta.depth(); let arity = self.meta.arity(); @@ -663,7 +663,7 @@ impl RandomAccessFile { if self.key.is_some() && self.id.is_some() { Some(ObjectRef::from_id_key( self.id.unwrap(), - self.key.as_ref().unwrap().clone(), + self.key.to_owned().unwrap(), )) } else { None @@ -813,7 +813,7 @@ mod test { .expect("new_from_slice"); log_debug!("{}", file); - let id = file.id.as_ref().unwrap().clone(); + let id = file.id.to_owned().unwrap(); let file_size = file.size(); log_debug!("file size to save : {}", file_size); @@ -887,7 +887,7 @@ mod test { .expect("new_from_slice"); log_debug!("{}", file); - let _id = file.id.as_ref().unwrap().clone(); + let _id = file.id.to_owned().unwrap(); log_debug!("data size: {}", data_size); @@ -1493,7 +1493,7 @@ mod test { let file = File::open( file.id().unwrap(), - file.key().as_ref().unwrap().clone(), + file.key().to_owned().unwrap(), store, ) .expect("open"); diff --git a/ng-repo/src/kcv_storage.rs b/ng-repo/src/kcv_storage.rs index e7aeee45..cf376e44 100644 --- a/ng-repo/src/kcv_storage.rs +++ b/ng-repo/src/kcv_storage.rs @@ -553,6 +553,29 @@ impl< } Ok(res) } + + pub fn take_first_value(&self, model: &mut Model) -> Result { + model.check_exists()?; + let key_prefix = model.key(); + let key_prefix_len = key_prefix.len(); + let total_size = key_prefix_len + self.value_size()?; + let val = model.storage().take_first_value( + self.prefix, + total_size, + key_prefix.to_vec(), + None, + &None, + )?; + Ok(from_slice(&val)?) + // ? { + // if val.0.len() == total_size + 1 { + // let col: Column = from_slice(&val.0[1 + key_prefix_len..total_size + 1])?; + // let val = from_slice(&val.1)?; + // res.insert(col, val); + // } + // } + // Ok(res) + } } impl< Model: IModel, @@ -956,6 +979,15 @@ pub trait WriteTransaction: ReadTransaction { family: &Option, ) -> Result<(), StorageError>; + fn take_first_value( + &self, + prefix: u8, + key_size: usize, + key_prefix: Vec, + suffix: Option, + family: &Option, + ) -> Result, StorageError>; + /// Delete a property from the store. fn del( &self, @@ -1025,6 +1057,15 @@ pub trait ReadTransaction { family: &Option, ) -> Result>, StorageError>; + fn get_first_key_value( + &self, + prefix: u8, + key_size: usize, + key_prefix: Vec, + suffix: Option, + family: &Option, + ) -> Result<(Vec,Vec), StorageError>; + /// Check if a specific value exists for a property from the store. fn has_property_value( &self, diff --git a/ng-repo/src/repo.rs b/ng-repo/src/repo.rs index e334632a..8de92221 100644 --- a/ng-repo/src/repo.rs +++ b/ng-repo/src/repo.rs @@ -123,6 +123,8 @@ pub struct Repo { pub signer: Option, + pub inbox: Option, + pub certificate_ref: Option, pub members: HashMap, @@ -569,6 +571,7 @@ impl Repo { members, store, signer: None, + inbox: None, certificate_ref: None, read_cap: None, write_cap: None, diff --git a/ng-repo/src/store.rs b/ng-repo/src/store.rs index 022c7fa3..980ebdef 100644 --- a/ng-repo/src/store.rs +++ b/ng-repo/src/store.rs @@ -634,8 +634,8 @@ impl Store { creator, *branch_id, QuorumType::IamTheSignature, - vec![branch_info.read_cap.as_ref().unwrap().clone()], - vec![branch_info.read_cap.as_ref().unwrap().clone()], + vec![branch_info.read_cap.to_owned().unwrap()], + vec![branch_info.read_cap.to_owned().unwrap()], sync_sig_commit_body.clone(), &self, )?; @@ -681,6 +681,7 @@ impl Store { id: repo_pub_key, repo_def: repository, signer: Some(signer_cap), + inbox: None, members: HashMap::new(), store: Arc::clone(&self), read_cap: Some(root_branch_readcap), diff --git a/ng-repo/src/types.rs b/ng-repo/src/types.rs index c5d199ec..820d039d 100644 --- a/ng-repo/src/types.rs +++ b/ng-repo/src/types.rs @@ -613,6 +613,12 @@ pub enum OverlayId { Global, } +impl Default for OverlayId { + fn default() -> Self { + OverlayId::Outer([0;32]) + } +} + impl fmt::Display for OverlayId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mut ser = serde_bare::to_vec(&self).unwrap(); @@ -1953,6 +1959,54 @@ pub enum RemoveLink { V0(RemoveLinkV0), } + +/// Adds an Inbox Capability (privkey) into the user branch, so that a user can share with all its device. +/// +/// DEPS to the previous AddInboxCap commit(s) if it is an update. in this case, repo_id should match +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct AddInboxCapV0 { + /// Repo the Inbox is opened for + pub repo_id: RepoId, + + /// Overlay of the repo + pub overlay: OverlayId, + + pub priv_key: PrivKey, + + /// Metadata + #[serde(with = "serde_bytes")] + pub metadata: Vec, +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum AddInboxCap { + V0(AddInboxCapV0), +} + +impl AddInboxCap { + pub fn new_v0(repo_id: RepoId, overlay: OverlayId, priv_key: PrivKey) -> Self { + Self::V0(AddInboxCapV0{ + repo_id, + overlay, + priv_key, + metadata: vec![] + }) + } +} + +impl fmt::Display for AddInboxCap { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::V0(v0) => { + writeln!(f, "V0")?; + writeln!(f, "repo: {:?}", v0.repo_id)?; + writeln!(f, "cap: {:?}", v0.priv_key)?; + Ok(()) + } + } + } +} + /// Adds a SignerCap into the user branch, /// /// so that a user can share with all its device a new signing capability that was just created. @@ -2636,6 +2690,7 @@ pub enum CommitBodyV0 { // AddLink(AddLink), RemoveLink(RemoveLink), + AddInboxCap(AddInboxCap), AddSignerCap(AddSignerCap), RemoveSignerCap(RemoveSignerCap), WalletUpdate(WalletUpdate), diff --git a/ng-repo/src/utils.rs b/ng-repo/src/utils.rs index e90ae1a9..5178c2f3 100644 --- a/ng-repo/src/utils.rs +++ b/ng-repo/src/utils.rs @@ -149,10 +149,10 @@ pub fn keypair_from_ed(secret: SecretKey, public: PublicKey) -> (PrivKey, PubKey pub fn sign( author_privkey: &PrivKey, author_pubkey: &PubKey, - content: &Vec, + content: &[u8], ) -> Result { let keypair = pubkey_privkey_to_keypair(author_pubkey, author_privkey); - let sig_bytes = keypair.sign(content.as_slice()).to_bytes(); + let sig_bytes = keypair.sign(content).to_bytes(); // log_debug!( // "XXXX SIGN {:?} {:?} {:?}", // author_pubkey, @@ -166,7 +166,7 @@ pub fn sign( Ok(Sig::Ed25519Sig(ss)) } -pub fn verify(content: &Vec, sig: Sig, pub_key: PubKey) -> Result<(), NgError> { +pub fn verify(content: &[u8], sig: Sig, pub_key: PubKey) -> Result<(), NgError> { let pubkey = match pub_key { PubKey::Ed25519PubKey(pk) => pk, _ => panic!("cannot verify with Montgomery keys"), @@ -207,6 +207,13 @@ pub fn now_timestamp() -> Timestamp { .unwrap() } +pub fn now_precise_timestamp() -> (u64,u32) { + let dur = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap(); + (dur.as_secs(),dur.subsec_nanos()) +} + /// returns a new NextGraph Timestamp equivalent to the duration after now. pub fn timestamp_after(duration: Duration) -> Timestamp { (((SystemTime::now().duration_since(UNIX_EPOCH).unwrap() + duration).as_secs() diff --git a/ng-sdk-js/Cargo.toml b/ng-sdk-js/Cargo.toml index f21bee88..81804ee8 100644 --- a/ng-sdk-js/Cargo.toml +++ b/ng-sdk-js/Cargo.toml @@ -51,7 +51,7 @@ oxrdf = { git = "https://git.nextgraph.org/NextGraph/oxigraph.git", branch="mai [target.'cfg(target_arch = "wasm32")'.dependencies] js-sys = "0.3.61" serde-wasm-bindgen = "0.6" -wasm-bindgen-futures = "0.4.34" +wasm-bindgen-futures = "0.4.50" gloo-timers = "0.2.6" # web-sys = { version = "0.3.61", features = ["Window"] } diff --git a/ng-sdk-js/src/lib.rs b/ng-sdk-js/src/lib.rs index 89bf29a4..220ae09f 100644 --- a/ng-sdk-js/src/lib.rs +++ b/ng-sdk-js/src/lib.rs @@ -19,6 +19,7 @@ use std::net::IpAddr; use std::str::FromStr; use std::sync::Arc; +use nextgraph::net::app_protocol::AppRequest; use ng_wallet::types::SensitiveWallet; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; @@ -40,7 +41,7 @@ use ng_repo::utils::{decode_key, decode_priv_key}; use ng_net::app_protocol::*; use ng_net::broker::*; -use ng_net::types::{BindAddress, ClientInfo, ClientInfoV0, ClientType, CreateAccountBSP, IP, BootstrapContentV0}; +use ng_net::types::{BindAddress, ClientInfo, ClientInfoV0, ClientType, CreateAccountBSP, IP, BootstrapContentV0, InboxPost}; use ng_net::utils::{ decode_invitation_string, parse_ip_and_port_for, retrieve_local_bootstrap, retrieve_local_url, spawn_and_log_error, Receiver, ResultSend, Sender, @@ -603,6 +604,47 @@ pub async fn rdf_dump(session_id: JsValue) -> Result { } } +/// from_profile_nuri = did:ng:a or did:ng:b +/// query_nuri = did:ng:o:c:k +/// contacts = did:ng:d:c or a sparql query +#[wasm_bindgen] +pub async fn social_query_start( + session_id: JsValue, + from_profile_nuri: String, + query_nuri: String, + contacts: String, + degree: JsValue, + ) -> Result<(), String> { + let session_id: u64 = serde_wasm_bindgen::from_value::(session_id) + .map_err(|_| "Invalid session_id".to_string())?; + let degree: u16 = serde_wasm_bindgen::from_value::(degree) + .map_err(|_| "Invalid degree".to_string())?; + + let query = NuriV0::new_from_commit(&query_nuri).map_err(|e| format!("Invalid query_nuri {e}"))?; + + let from_profile = match from_profile_nuri.as_str() { + "did:ng:a" => NuriV0::new_public_store_target(), + "did:ng:b" => NuriV0::new_protected_store_target(), + _ => return Err("Invalid from_profile_nuri".to_string()) + }; + + if ! (contacts == "did:ng:d:c" || contacts.starts_with("SELECT")) { return Err("Invalid contacts".to_string()); } + + let mut request = AppRequest::social_query_start(from_profile, query, contacts, degree); + request.set_session_id(session_id); + + let res = nextgraph::local_broker::app_request(request) + .await + .map_err(|e: NgError| e.to_string())?; + + let AppResponse::V0(res) = res; + match res { + AppResponseV0::Ok => Ok(()), + AppResponseV0::Error(e) => Err(e.to_string()), + _ => Err("invalid response".to_string()), + } +} + #[wasm_bindgen] pub async fn branch_history(session_id: JsValue, nuri: JsValue) -> Result { let session_id: u64 = serde_wasm_bindgen::from_value::(session_id) diff --git a/ng-storage-rocksdb/src/kcv_storage.rs b/ng-storage-rocksdb/src/kcv_storage.rs index d36a8da8..e78c6dcd 100644 --- a/ng-storage-rocksdb/src/kcv_storage.rs +++ b/ng-storage-rocksdb/src/kcv_storage.rs @@ -74,6 +74,21 @@ impl<'a> ReadTransaction for RocksdbTransaction<'a> { .get_all_keys_and_values_(prefix, key_size, key_prefix, suffix, iter) } + fn get_first_key_value( + &self, + prefix: u8, + key_size: usize, + key_prefix: Vec, + suffix: Option, + family: &Option, + ) -> Result<(Vec,Vec), StorageError> { + let property_start = + RocksDbKCVStorage::calc_key_start(prefix, key_size, &key_prefix, &suffix); + let iter = self.get_iterator(&property_start, &family)?; + self.store + .get_first_key_value_(prefix, key_size, key_prefix, suffix, iter) + } + fn get_all_properties_of_key( &self, prefix: u8, @@ -213,6 +228,20 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> { Ok(()) } + fn take_first_value( + &self, + prefix: u8, + key_size: usize, + key_prefix: Vec, + suffix: Option, + family: &Option, + ) -> Result, StorageError> { + let (key,value) = self.get_first_key_value(prefix, key_size, key_prefix, suffix, family)?; + let key_without_prefix = key[1..].to_vec(); + self.del(prefix, &key_without_prefix, suffix, family)?; + Ok(value) + } + /// Delete a specific value for a property from the store. fn del_property_value( &self, @@ -342,6 +371,20 @@ impl ReadTransaction for RocksDbKCVStorage { self.get_all_keys_and_values_(prefix, key_size, key_prefix, suffix, iter) } + fn get_first_key_value( + &self, + prefix: u8, + key_size: usize, + key_prefix: Vec, + suffix: Option, + family: &Option, + ) -> Result<(Vec,Vec), StorageError> { + let property_start = + RocksDbKCVStorage::calc_key_start(prefix, key_size, &key_prefix, &suffix); + let iter = self.get_iterator(&property_start, &family)?; + self.get_first_key_value_(prefix, key_size, key_prefix, suffix, iter) + } + /// returns a map of found properties and their value. If `properties` is empty, then all the properties are returned. /// Otherwise, only the properties in the list are returned (if found in backend storage) fn get_all_properties_of_key( @@ -470,6 +513,23 @@ impl WriteTransaction for RocksDbKCVStorage { self.write_transaction(&mut |tx| tx.del(prefix, key, suffix, family)) } + fn take_first_value( + &self, + prefix: u8, + key_size: usize, + key_prefix: Vec, + suffix: Option, + family: &Option, + ) -> Result, StorageError> { + let mut value: Option> = None; + self.write_transaction(&mut |tx| { + let val = tx.take_first_value(prefix, key_size, key_prefix.clone(), suffix, family)?; + value = Some(val); + Ok(()) + })?; + Ok(value.unwrap()) + } + /// Delete a specific value for a property from the store. fn del_property_value( &self, @@ -612,6 +672,81 @@ impl RocksDbKCVStorage { Ok(vector) } + fn get_first_key_value_( + &self, + prefix: u8, + key_size: usize, + key_prefix: Vec, + suffix: Option, + mut iter: DBIteratorWithThreadMode<'_, impl ng_rocksdb::DBAccess>, + ) -> Result<(Vec,Vec), StorageError> { + if key_prefix.len() > key_size { + return Err(StorageError::InvalidValue); + } + + // let mut vec_key_start = key_prefix.clone(); + // let mut trailing_zeros = vec![0u8; key_size - key_prefix.len()]; + // vec_key_start.append(&mut trailing_zeros); + + let mut vec_key_end = key_prefix.clone(); + let mut trailing_max = vec![255u8; key_size - key_prefix.len()]; + vec_key_end.append(&mut trailing_max); + + // let property_start = Self::compute_property(prefix, &vec_key_start, suffix); + let property_end = + Self::compute_property(prefix, &vec_key_end, &Some(suffix.unwrap_or(255u8))); + + // let mut iter = match family { + // Some(cf) => self.db.iterator_cf( + // self.db + // .cf_handle(&cf) + // .ok_or(StorageError::UnknownColumnFamily)?, + // IteratorMode::From(&property_start, Direction::Forward), + // ), + // None => self + // .db + // .iterator(IteratorMode::From(&property_start, Direction::Forward)), + // }; + loop { + let res = iter.next(); + match res { + Some(Ok(val)) => { + //log_info!("{:?} {:?}", val.0, val.1); + match compare(&val.0, property_end.as_slice()) { + std::cmp::Ordering::Less | std::cmp::Ordering::Equal => { + if suffix.is_some() { + if val.0.len() < key_size + 2 + || val.0[1 + key_size] != suffix.unwrap() + { + // log_info!( + // "SKIPPED cause suffix {} {} {} {}", + // val.0.len(), + // key_size + 2, + // val.0[1 + key_size], + // suffix.unwrap() + // ); + continue; + } + // } else if val.0.len() > (key_size + 1) { + // continue; + } + return Ok((val.0.to_vec(), val.1.to_vec())); + } + _ => { + //log_info!("SKIPPED cause above END"); + break; + } //, + } + } + Some(Err(_e)) => return Err(StorageError::BackendError), + None => { + break; + } + } + } + Err(StorageError::NotFound) + } + fn calc_key_start( prefix: u8, key_size: usize, diff --git a/ng-verifier/src/commits/mod.rs b/ng-verifier/src/commits/mod.rs index 60d40e6d..068e5808 100644 --- a/ng-verifier/src/commits/mod.rs +++ b/ng-verifier/src/commits/mod.rs @@ -128,10 +128,14 @@ impl CommitVerifier for RootBranch { let signer = verifier .user_storage() .and_then(|storage| storage.get_signer_cap(&id).ok()); + let inbox = verifier + .user_storage() + .and_then(|storage| storage.get_inbox_cap(&id).ok()); let repo = Repo { id, repo_def: repository.clone(), signer, + inbox, members: HashMap::new(), store: Arc::clone(&store), read_cap: Some(reference), @@ -312,6 +316,21 @@ impl CommitVerifier for StoreUpdate { } } #[async_trait::async_trait] +impl CommitVerifier for AddInboxCap { + async fn verify( + &self, + _commit: &Commit, + verifier: &mut Verifier, + _branch_id: &BranchId, + _repo_id: &RepoId, + _store: Arc, + ) -> Result<(), VerifierError> { + match self { + AddInboxCap::V0(v0) => verifier.update_inbox_cap_v0(&v0), + } + } +} +#[async_trait::async_trait] impl CommitVerifier for AddSignerCap { async fn verify( &self, diff --git a/ng-verifier/src/commits/transaction.rs b/ng-verifier/src/commits/transaction.rs index 648625bf..ec5ebc04 100644 --- a/ng-verifier/src/commits/transaction.rs +++ b/ng-verifier/src/commits/transaction.rs @@ -248,6 +248,19 @@ impl Verifier { Ok(()) } + pub(crate) fn get_triples_from_transaction(commit_body: &CommitBody) -> Result, VerifierError> { + match commit_body { + CommitBody::V0(CommitBodyV0::AsyncTransaction(Transaction::V0(v0))) => { + let transac: TransactionBody = serde_bare::from_slice(v0)?; + if let Some(graph_transac) = transac.graph { + return Ok(graph_transac.inserts); + } + }, + _ => {} + } + Err(VerifierError::InvalidCommit) + } + pub(crate) async fn verify_async_transaction( &mut self, transaction: &Transaction, diff --git a/ng-verifier/src/inbox_processor.rs b/ng-verifier/src/inbox_processor.rs new file mode 100644 index 00000000..d84ab68f --- /dev/null +++ b/ng-verifier/src/inbox_processor.rs @@ -0,0 +1,589 @@ +// Copyright (c) 2022-2025 Niko Bonnieure, Par le Peuple, NextGraph.org developers +// All rights reserved. +// Licensed under the Apache License, Version 2.0 +// +// or the MIT license , +// at your option. All files in the project carrying such +// notice may not be copied, modified, or distributed except +// according to those terms. + +//! Processor for each type of InboxMsgContent + +use ng_net::actor::SoS; +use ng_oxigraph::oxigraph::sparql::QueryResults; +use ng_oxigraph::oxrdf::{NamedNode, Term, Triple}; +use ng_oxigraph::oxsdatatypes::DateTime; +use ng_repo::types::{Block, ObjectRef, OverlayId, PrivKey, RepoId, StoreRepo, StoreRepoV0}; +use ng_repo::{errors::*, store::Store, types::Commit}; +use ng_repo::log::*; + +use ng_net::types::{InboxMsg, InboxMsgContent, InboxPost, SocialQuery, SocialQueryResponse, SocialQueryResponseContent}; +use ng_net::app_protocol::*; + +use crate::verifier::*; + +impl Verifier { + + async fn post_to_inbox(&self, post: InboxPost) -> Result<(), VerifierError> { + match self.client_request::<_,()>(post).await + { + Err(e) => Err(VerifierError::InboxError(e.to_string())), + Ok(SoS::Stream(_)) => Err(VerifierError::InboxError(NgError::InvalidResponse.to_string())), + Ok(SoS::Single(_)) => Ok(()), + } + } + + pub(crate) async fn create_social_query_forwarder( + &mut self, + social_query_doc_nuri_string: &String, + from_forwarder_nuri_string: &String, + from_profile_nuri_string: &String, + from_inbox_nuri_string: &String, + ) -> Result<(String,NuriV0), VerifierError> { + // creating the ForwardedSocialQuery in the private store + let forwarder = self.doc_create_with_store_repo( + "Graph".to_string(), "social:query:forwarded".to_string(), + "store".to_string(), None // meaning in private store + ).await?; + let forwarder_nuri = NuriV0::new_from_repo_graph(&forwarder)?; + let forwarder_id = forwarder_nuri.target.repo_id().clone(); + let forwarder_nuri_string = NuriV0::repo_id(&forwarder_id); + + // adding triples in forwarder doc : ng:social_query_id + let sparql_update = format!(" PREFIX ng: + PREFIX xsd: + INSERT DATA {{ <> ng:social_query_id <{social_query_doc_nuri_string}>. + <> ng:social_query_forwarder <{from_forwarder_nuri_string}>. + <> ng:social_query_from_inbox <{from_inbox_nuri_string}>. + <> ng:social_query_from_profile <{from_profile_nuri_string}>. + <> ng:social_query_started \"{}\"^^xsd:dateTime . }}",DateTime::now()); + let ret = self + .process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![]) + .await; + if let Err(e) = ret { + return Err(VerifierError::SparqlError(e)); + } + Ok((forwarder_nuri_string,forwarder_nuri)) + } + + pub(crate) async fn mark_social_query_forwarder(&mut self, forwarder_nuri_string: &String, forwarder_nuri: &NuriV0, predicate: String) -> Result<(), VerifierError> { + + // adding triples in forwarder doc : ng:social_query_id + let sparql_update = format!("INSERT DATA {{ <{forwarder_nuri_string}> \"{}\"^^ . }}",DateTime::now()); + let ret = self + .process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![]) + .await; + if let Err(e) = ret { + return Err(VerifierError::SparqlError(e)); + } + Ok(()) + } + + fn get_privkey_of_inbox(&self, this_overlay: &OverlayId) -> Result { + let store = self.get_store_by_overlay_id(this_overlay)?; + let repo = self.repos.get(&store.id()).ok_or(NgError::RepoNotFound)?; + let from_inbox = repo.inbox.to_owned().ok_or(NgError::InboxNotFound)?; + Ok(from_inbox) + } + + pub(crate) fn get_profile_replying_to(&self, forwarded_from_profile: &String) -> Result< + (OverlayId, PrivKey) ,NgError> { + + let from_profile_id = if forwarded_from_profile.starts_with("did:ng:b") { + self.config.protected_store_id.unwrap() + } else { + self.config.public_store_id.unwrap() + }; + + let repo = self.repos.get(&from_profile_id).ok_or(NgError::RepoNotFound)?; + let inbox = repo.inbox.to_owned().ok_or(NgError::InboxNotFound)?; + let overlay = repo.store.get_store_repo().outer_overlay(); + + Ok( (overlay, inbox.clone()) ) + } + + pub(crate) fn get_2_profiles(&self) -> Result<( + (StoreRepo, PrivKey), // public + (StoreRepo, PrivKey) // protected + ) ,NgError> { + + let protected_store_id = self.config.protected_store_id.unwrap(); + let protected_repo = self.repos.get(&protected_store_id).ok_or(NgError::RepoNotFound)?; + let protected_inbox = protected_repo.inbox.to_owned().ok_or(NgError::InboxNotFound)?; + let protected_store_repo = protected_repo.store.get_store_repo(); + + let public_store_id = self.config.public_store_id.unwrap(); + let public_repo = self.repos.get(&public_store_id).ok_or(NgError::RepoNotFound)?; + let public_inbox = public_repo.inbox.to_owned().ok_or(NgError::InboxNotFound)?; + let public_store_repo = public_repo.store.get_store_repo(); + + Ok(( + (*public_store_repo, public_inbox.clone()), + (*protected_store_repo, protected_inbox.clone()) + )) + } + + pub(crate) async fn social_query_dispatch( + &mut self, + to_profile_nuri: &String, + to_inbox_nuri: &String, + forwarder_nuri: &NuriV0, + forwarder_id: &RepoId, + from_profiles: &( + (StoreRepo, PrivKey), // public + (StoreRepo, PrivKey) // protected + ), + query_id: &RepoId, + definition_commit_body_ref: &ObjectRef, + blocks: &Vec, + degree: u16, + ) -> Result<(), VerifierError> { + + // first add an entry in the local forwarded social query, to monitor progress + let sparql_update = format!(" + PREFIX ng: + INSERT DATA {{ + ng:social_query_forwarded_to_profile <{to_profile_nuri}> . + ng:social_query_forwarded_to_inbox <{to_inbox_nuri}> . + }}"); + let ret = self + .process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![]) + .await; + if let Err(e) = ret { + return Err(VerifierError::SparqlError(e)); + } + // then send InboxPost message. + + let from_profile = if to_profile_nuri.starts_with("did:ng:b") { + &from_profiles.1 + } else { + &from_profiles.0 + }; + + self.post_to_inbox(InboxPost::new_social_query_request( + from_profile.0, + from_profile.1.clone(), + *forwarder_id, + to_profile_nuri.clone(), + to_inbox_nuri.clone(), + None, + *query_id, + definition_commit_body_ref.clone(), + blocks.to_vec(), + degree, + )?).await?; + + Ok(()) + } + + pub(crate) async fn process_inbox( + &mut self, + msg: InboxMsg, + content: InboxMsgContent, + ) -> Result<(), VerifierError> { + + match content { + InboxMsgContent::SocialQuery(SocialQuery::Request(req)) => { + + let profile_id_nuri = NuriV0::from_store_repo_string(&req.from_profile_store_repo); + + //TODO: check that msg.body.from_overlay matches with req.from_profile_store_repo + + //TODO: check that this contact is mutual req.from_profile_store_repo must be in our contact list + + // getting the privkey of the inbox because we will need it here below to send responses. + let reply_with_inbox = self.get_privkey_of_inbox(&msg.body.to_overlay)?; + + let social_query_doc_nuri_string: String = NuriV0::repo_id(&req.query_id); + + // checking that we didn't process this query ID yet. if we did, return a SocialQueryResponseContent::AlreadyRequested + match self.sparql_query( + &NuriV0::new_entire_user_site(), + format!("ASK {{ ?s <{social_query_doc_nuri_string}> }}"), None).await? + { + QueryResults::Boolean(true) => { + let post = InboxPost::new_social_query_response_replying_to( + &msg.body, + &req, + SocialQueryResponseContent::AlreadyRequested, + reply_with_inbox.clone() + )?; + self.post_to_inbox(post).await?; + return Ok(()); + } + _ => {} + } + + // otherwise, create the forwarder + let (forwarder_nuri_string, forwarder_nuri) = self.create_social_query_forwarder( + &social_query_doc_nuri_string, + &NuriV0::repo_id(&req.forwarder_id), + &NuriV0::from_store_repo_string(&req.from_profile_store_repo), + &NuriV0::inbox(&msg.body.from_inbox.unwrap()) + ).await?; + + let temp_mini_block_storage = Store::new_temp_in_mem(); + for block in msg.blocks.iter() { + let _id = temp_mini_block_storage.put(block)?; + } + let commit = Commit::load(req.definition_commit_body_ref.clone(), + &temp_mini_block_storage, true) + .map_err(|e| { + //log_err!("err : {:?}", e); + e + })?; + + let triples = Verifier::get_triples_from_transaction(commit.body().unwrap())?; + + let mut sparql: Option = None; + for triple in triples { + if triple.predicate.as_str() == "did:ng:x:ng#social_query_sparql" { + sparql = Some( + match triple.object { + Term::Literal(l) => l.value().into(), + _ => return Err(VerifierError::InvalidSocialQuery) + }); + break; + } + } + //TODO: in case of errors here below, mark the forwarder as ng:social_query_error + if sparql.is_none() { return Err(VerifierError::InvalidSocialQuery); } + + log_info!("{}",sparql.as_ref().unwrap()); + + let res = self.sparql_query(&NuriV0::new_entire_user_site(), sparql.unwrap(), None).await?; + + let results = match res { + QueryResults::Boolean(_) | QueryResults::Solutions(_) => return Err(VerifierError::NotImplemented), + QueryResults::Graph(triples) => { + let mut results = vec![]; + for t in triples { + match t { + Err(e) => { log_err!("{}",e.to_string()); return Err(VerifierError::SparqlError(e.to_string()))}, + Ok(triple) => results.push(triple), + } + } + results + } + }; + + log_info!("{:?}",results); + + // Do we have local results matching the request's query? If yes, we send them back to the forwarder right away + if !results.is_empty() { + let content = SocialQueryResponseContent::Graph(serde_bare::to_vec(&results).unwrap()); + let post = InboxPost::new_social_query_response_replying_to( + &msg.body, + &req, + content, + reply_with_inbox.clone() + )?; + self.post_to_inbox(post).await?; + } + + // only fan out if we have contacts (that match the grant selected by current user) + // and if degree is > to 1 or equal to zero + if req.degree == 1 { + + // ending here. + self.mark_social_query_forwarder(&forwarder_nuri_string, &forwarder_nuri, "social_query_ended".to_string()).await?; + let post = InboxPost::new_social_query_response_replying_to( + &msg.body, + &req, + SocialQueryResponseContent::EndOfReplies, + reply_with_inbox.clone() + )?; + self.post_to_inbox(post).await?; + + return Ok(()) + } + // fan out forwarded social queries to all contacts (except the one we received it from) + + // getting the contacts to forward to + let sparql = format!("PREFIX ng: + SELECT ?profile_id ?inbox_id WHERE + {{ ?c ng:c \"social:contact\" . + OPTIONAL {{ ?c ng:site ?profile_id . ?c ng:site_inbox ?inbox_id }} + OPTIONAL {{ ?c ng:protected ?profile_id . ?c ng:protected_inbox ?inbox_id }} + FILTER ( bound(?profile_id) && NOT EXISTS {{ ?c ng:site <{profile_id_nuri}> }} && NOT EXISTS {{ ?c ng:protected <{profile_id_nuri}> }} ) + }}"); + log_info!("{sparql}"); + let sols = match self.sparql_query( + &NuriV0::new_entire_user_site(), + sparql, None).await? + { + QueryResults::Solutions(sols) => { sols } + _ => return Err(VerifierError::SparqlError(NgError::InvalidResponse.to_string())), + }; + + let degree = if req.degree == 0 { 0 } else { req.degree - 1 }; + log_info!("new degree {degree}"); + let mut found_contact = false; + let forwarder_id = forwarder_nuri.target.repo_id().clone(); + + let from_profiles = self.get_2_profiles()?; + + for sol in sols { + match sol { + Err(e) => return Err(VerifierError::SparqlError(e.to_string())), + Ok(s) => { + if let Some(Term::NamedNode(profile_id)) = s.get("profile_id") { + let to_profile_nuri = profile_id.as_string(); + if let Some(Term::NamedNode(inbox_id)) = s.get("inbox_id") { + let to_inbox_nuri = inbox_id.as_string(); + + found_contact = true; + + self.social_query_dispatch( + to_profile_nuri, + to_inbox_nuri, + &forwarder_nuri, + &forwarder_id, + &from_profiles, + &req.query_id, + &req.definition_commit_body_ref, + &msg.blocks, + degree + ).await?; + } + } + } + } + } + // if not found any contact, we stop here + log_info!("found contact {found_contact}"); + if !found_contact { + self.mark_social_query_forwarder(&forwarder_nuri_string, &forwarder_nuri, "social_query_ended".to_string()).await?; + let post = InboxPost::new_social_query_response_replying_to( + &msg.body, + &req, + SocialQueryResponseContent::EndOfReplies, + reply_with_inbox + )?; + self.post_to_inbox(post).await?; + } + + } + InboxMsgContent::SocialQuery(SocialQuery::Response(response)) => { + + if msg.body.from_inbox.is_none() { + // TODO log error + // we do nothing as this is invalid msg. it must have a from. + return Ok(()) + } + + // TODO: first we open the response.forwarder_id (because in webapp, it might not be loaded yet) + + let forwarder_nuri = NuriV0::new_repo_target_from_id(&response.forwarder_id); + let forwarder_nuri_string = NuriV0::repo_id(&response.forwarder_id); + // checking that we do have a running ForwardedSocialQuery, and that it didnt end, otherwise it must be spam. + match self.sparql_query( &forwarder_nuri, format!("ASK {{ <> <{}> }} ", + NuriV0::repo_id(&response.query_id)),Some(forwarder_nuri_string.clone())).await? { + QueryResults::Boolean(true) => {} + _ => { return Err(VerifierError::InvalidSocialQuery) } + } + let (forwarded_from_profile, forwarded_from_inbox, from_forwarder) = match self.sparql_query( + &forwarder_nuri, + "PREFIX ng: + SELECT ?from_profile ?from_inbox ?from_forwarder ?ended WHERE + {{ <> ng:social_query_from_profile ?from_profile . + <> ng:social_query_from_inbox ?from_inbox . + <> ng:social_query_forwarder ?from_forwarder . + <> ng:social_query_ended ?ended . + }}".to_string(), + Some(forwarder_nuri_string)).await? + { + QueryResults::Solutions(mut sols) => { + match sols.next() { + None => { + log_info!("at origin"); + (None, None, None) + } + Some(Err(e)) => { + // TODO log error + // we do nothing as we couldn't find the ForwardedSocialQuery + return Err(VerifierError::SparqlError(e.to_string())); + } + Some(Ok(sol)) => { + if let Some(Term::NamedNode(_)) = sol.get("ended") { + // TODO log error : someone is giving back some results while the forwarder is ended + return Ok(()) + }; + let from_profile = if let Some(Term::NamedNode(nuri)) = sol.get("from_profile") { + Some(nuri.as_string().clone()) + } else { + None + }; + let from_inbox = if let Some(Term::NamedNode(nuri)) = sol.get("from_inbox") { + Some(nuri.as_string().clone()) + } else { + None + }; + let from_forwarder = if let Some(Term::NamedNode(nuri)) = sol.get("from_forwarder") { + Some(nuri.as_string().clone()) + } else { + None + }; + + (from_profile, from_inbox, from_forwarder) + } + } + } + _ => return Err(VerifierError::SparqlError(NgError::InvalidResponse.to_string())), + }; + + // searching for the tokenized commit that added this forwarding. + let spar = format!("PREFIX ng: + SELECT ?token WHERE + {{ ?token ng:social_query_forwarded_to_inbox <{}> . + MINUS {{ ?token ng:social_query_ended ?t . }} . + }}", + NuriV0::inbox(&msg.body.from_inbox.unwrap()) + ); + log_info!("{spar}"); + let token = match self.sparql_query( + &forwarder_nuri, + //<> ng:social_query_id <{}> NuriV0::inbox(&msg.body.from_inbox.unwrap()), + spar, + Some(NuriV0::repo_id(&response.forwarder_id))).await? + { + QueryResults::Solutions(mut sols) => { + match sols.next() { + None => { return Err(VerifierError::SparqlError("Token not found".to_string())); } + Some(Err(e)) => { + // TODO log error + // we do nothing as we couldn't find the token + return Err(VerifierError::SparqlError(e.to_string())); + } + Some(Ok(sol)) => { + if let Some(Term::NamedNode(token)) = sol.get("token") { + token.as_string().clone() + } else { + // TODO log error + // we do nothing as we couldn't find the token + return Err(VerifierError::SparqlError(NgError::InvalidResponse.to_string())); + } + } + } + } + _ => return Err(VerifierError::SparqlError(NgError::InvalidResponse.to_string())), + }; + log_info!("token = {token}"); + + let at_origin = forwarded_from_profile.is_none() || forwarded_from_inbox.is_none() || from_forwarder.is_none(); + + match response.content { + SocialQueryResponseContent::AlreadyRequested + | SocialQueryResponseContent::EndOfReplies + | SocialQueryResponseContent::Error(_) => { + // ending here this forwarding. + self.mark_social_query_forwarder(&token, &forwarder_nuri, "social_query_ended".to_string()).await?; + // TODO record error + + // if we are at the end of the whole ForwardedSocialQuery (no more pending responses) + // we send EndOfReplies upstream, and mark as ended. + + let the_end = match self.sparql_query( + &forwarder_nuri, + format!("PREFIX ng: + SELECT ?token WHERE + {{ ?token ng:social_query_forwarded_to_profile ?p . + MINUS {{ ?token ng:social_query_ended ?t . }} + }}"), + None).await? + { + QueryResults::Solutions(mut sols) => { + match sols.next() { + None => true, + _ => false, + } + } + _ => { + // TODO: log error + false + } + }; + if the_end { + // marking the end + self.mark_social_query_forwarder(&NuriV0::repo_id(&response.forwarder_id), &forwarder_nuri, "social_query_ended".to_string()).await?; + + if !at_origin { + // getting the privkey of the inbox because we will need it here below to send responses. + let from = self.get_profile_replying_to(forwarded_from_profile.as_ref().unwrap())?; + + // sending EndOfReplies upstream + let to_overlay = NuriV0::from_profile_into_overlay_id(forwarded_from_profile.as_ref().unwrap())?; + let to_inbox_id = NuriV0::from_inbox_into_id(forwarded_from_inbox.as_ref().unwrap())?; + let from_forwarder = NuriV0::from_repo_nuri_to_id(from_forwarder.as_ref().unwrap())?; + let post = InboxPost::new_social_query_response( + to_overlay, + to_inbox_id, + Some(from), + response.query_id, + from_forwarder, + SocialQueryResponseContent::EndOfReplies + )?; + self.post_to_inbox(post).await?; + } + } + } + SocialQueryResponseContent::Graph(graph) => { + + if at_origin { + + // insert the triples in the query document + let triples: Vec = serde_bare::from_slice(&graph)?; + + if triples.is_empty() { + return Err(VerifierError::InvalidResponse); + } + + let overlay_id = self.repos.get(&response.query_id).ok_or(VerifierError::RepoNotFound)?.store.outer_overlay(); + let nuri_ov = NuriV0::repo_graph_name(&response.query_id, &overlay_id); + let graph_name = NamedNode::new_unchecked(&nuri_ov); + + let quads = triples.into_iter().map(|t| t.in_graph(graph_name.clone()) ).collect(); + + // let quad = Quad { + // subject: NamedNode::new_unchecked(&nuri).into(), + // predicate: NG_ONTOLOGY_CLASS_NAME.clone().into(), + // object: Literal::new_simple_literal(primary_class).into(), + // graph_name: NamedNode::new_unchecked(&header_branch_nuri).into(), + // }; + let commits = self.prepare_sparql_update(quads, vec![], vec![]).await?; + + + } else { + + // we forward upstream + + // getting the privkey of the inbox because we will need it here below to send responses. + let from = self.get_profile_replying_to(forwarded_from_profile.as_ref().unwrap())?; + + let to_overlay = NuriV0::from_profile_into_overlay_id(forwarded_from_profile.as_ref().unwrap())?; + let to_inbox_id = NuriV0::from_inbox_into_id(forwarded_from_inbox.as_ref().unwrap())?; + let from_forwarder = NuriV0::from_repo_nuri_to_id(from_forwarder.as_ref().unwrap())?; + let post = InboxPost::new_social_query_response( + to_overlay, + to_inbox_id, + Some(from), + response.query_id, + from_forwarder, + SocialQueryResponseContent::Graph(graph) + )?; + self.post_to_inbox(post).await?; + } + + } + SocialQueryResponseContent::QueryResult(_) | SocialQueryResponseContent::False | SocialQueryResponseContent::True => { + // not implemented yet + unimplemented!(); + } + } + + } + _ => unimplemented!() + } + Ok(()) + } +} \ No newline at end of file diff --git a/ng-verifier/src/lib.rs b/ng-verifier/src/lib.rs index 18ae899a..dfe5d12c 100644 --- a/ng-verifier/src/lib.rs +++ b/ng-verifier/src/lib.rs @@ -20,6 +20,8 @@ mod commits; mod request_processor; +mod inbox_processor; + #[cfg(all(not(target_family = "wasm"), not(docsrs)))] mod rocksdb_user_storage; diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index 9baa017b..3095b33a 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -15,8 +15,12 @@ use std::sync::Arc; use futures::channel::mpsc; use futures::SinkExt; use futures::StreamExt; +use ng_net::actor::SoS; +use ng_net::types::InboxPost; +use ng_oxigraph::oxigraph::sparql::EvaluationError; use ng_oxigraph::oxigraph::sparql::{results::*, Query, QueryResults}; use ng_oxigraph::oxrdf::{Literal, NamedNode, Quad, Term}; +use ng_oxigraph::oxsdatatypes::DateTime; use ng_repo::errors::*; use ng_repo::file::{RandomAccessFile, ReadFile}; @@ -543,6 +547,107 @@ impl Verifier { Ok(res) } + pub(crate) async fn doc_create_with_store_repo( + &mut self, + crdt: String, + class_name: String, + destination: String, + store_repo: Option, + ) -> Result { + + let class = BranchCrdt::from(crdt, class_name)?; + + let nuri = if store_repo.is_none() { + NuriV0::new_private_store_target() + } else { + NuriV0::from_store_repo(&store_repo.unwrap()) + }; + + let destination = DocCreateDestination::from(destination)?; + + self.doc_create(nuri, DocCreate { + class, + destination, + }).await + } + + pub(crate) async fn sparql_query(&self, nuri: &NuriV0, sparql: String, base: Option) -> Result { + //log_debug!("query={}", query); + let store = self.graph_dataset.as_ref().unwrap(); + let mut parsed = Query::parse(&sparql, base.as_deref()) + .map_err(|e| VerifierError::SparqlError(e.to_string()))?; + let dataset = parsed.dataset_mut(); + //log_debug!("DEFAULTS {:?}", dataset.default_graph_graphs()); + if dataset.has_no_default_dataset() { + //log_info!("DEFAULT GRAPH AS UNION"); + dataset.set_default_graph_as_union(); + } + store + .query(parsed, self.resolve_target_for_sparql(&nuri.target, false)?) + .map_err(|e| VerifierError::SparqlError(e.to_string())) + } + + pub(crate) async fn doc_create( + &mut self, + nuri: NuriV0, + doc_create: DocCreate + ) -> Result { + //TODO: deal with doc_create.destination + + let user_id = self.user_id().clone(); + let user_priv_key = self.user_privkey().clone(); + let primary_class = doc_create.class.class().clone(); + let (_,_,store) = self.resolve_target(&nuri.target)?; + let repo_id = self + .new_repo_default( + &user_id, + &user_priv_key, + &store, + doc_create.class, + ) + .await?; + + let header_branch_id = { + let repo = self.get_repo(&repo_id, &store)?; + repo.header_branch().ok_or(NgError::BranchNotFound)?.id + }; + + // adding an AddRepo commit to the Store branch of store. + self.send_add_repo_to_store(&repo_id, &store) + .await?; + + // adding an ldp:contains triple to the store main branch + let overlay_id = store.outer_overlay(); + let nuri = NuriV0::repo_id(&repo_id); + let nuri_result = NuriV0::repo_graph_name(&repo_id, &overlay_id); + let store_nuri = NuriV0::from_store_repo(&store); + let store_nuri_string = NuriV0::repo_id(store.repo_id()); + let query = format!("INSERT DATA {{ <{store_nuri_string}> <{nuri}>. }}"); + + let ret = self + .process_sparql_update(&store_nuri, &query, &None, vec![]) + .await; + if let Err(e) = ret { + return Err(NgError::SparqlError(e)); + } + + self.add_doc(&repo_id, &overlay_id)?; + + // adding the class triple to the header branch + let header_branch_nuri = format!("{nuri_result}:b:{}", header_branch_id); + let quad = Quad { + subject: NamedNode::new_unchecked(&nuri).into(), + predicate: NG_ONTOLOGY_CLASS_NAME.clone().into(), + object: Literal::new_simple_literal(primary_class).into(), + graph_name: NamedNode::new_unchecked(&header_branch_nuri).into(), + }; + let ret = self.prepare_sparql_update(vec![quad], vec![], vec![]).await; + if let Err(e) = ret { + return Err(NgError::SparqlError(e.to_string())); + } + Ok(nuri_result) + } + pub(crate) async fn process( &mut self, command: &AppRequestCommandV0, @@ -550,6 +655,128 @@ impl Verifier { payload: Option, ) -> Result { match command { + AppRequestCommandV0::SocialQueryStart => { + let (from_profile, contacts_string, degree) = if let Some(AppRequestPayload::V0(AppRequestPayloadV0::SocialQueryStart{ + from_profile, contacts, degree + })) = + payload + { (from_profile, contacts, degree) } + else { + return Err(NgError::InvalidPayload); + }; + + // TODO: search for contacts (all stores, one store, a sparql query, etc..) + // (profile_nuri, inbox_nuri) + let contacts = if contacts_string.as_str() == "did:ng:d:c" { + let mut res = vec![]; + res.push(("did:ng:a:rjoQTS4LMBDcuh8CEjmTYrgALeApBg2cgKqyPEuQDUgA".to_string(),"did:ng:d:KMFdOcGjdFBQgA9QNEDWcgEErQ1isbvDe7d_xndNOUMA".to_string())); + res + } else { + unimplemented!(); + }; + + // if no contact found, return here with an AppResponse::error + if contacts.is_empty() { + return Ok(AppResponse::error(NgError::ContactNotFound.to_string())); + } + + //resolve from_profile + let from_profile_id = match from_profile.target { + NuriTargetV0::ProtectedProfile => { + self.config.protected_store_id.unwrap() + } + NuriTargetV0::PublicProfile => { + self.config.public_store_id.unwrap() + }, + _ => return Err(NgError::InvalidNuri) + }; + let store = { + let repo = self.repos.get(&from_profile_id).ok_or(NgError::RepoNotFound)?; + repo.store.clone() + }; + let query_id = nuri.target.repo_id(); + let definition_commit_body_ref = nuri.get_first_commit_ref()?; + let block_ids = Commit::collect_block_ids(definition_commit_body_ref.clone(), &store, true)?; + let mut blocks= Vec::with_capacity(block_ids.len()); + //log_info!("blocks nbr {}",block_ids.len()); + for bid in block_ids.iter() { + blocks.push(store.get(bid)?); + } + + // creating the ForwardedSocialQuery in the private store + let forwarder = self.doc_create_with_store_repo( + "Graph".to_string(), "social:query:forwarded".to_string(), + "store".to_string(), None // meaning in private store + ).await?; + let forwarder_nuri = NuriV0::new_from_repo_graph(&forwarder)?; + let forwarder_id = forwarder_nuri.target.repo_id().clone(); + let forwarder_nuri_string = NuriV0::repo_id(&forwarder_id); + + // adding triples in social_query doc : ng:social_query_forwarder + let social_query_doc_nuri_string = NuriV0::repo_id(query_id); + let sparql_update = format!("INSERT DATA {{ <{social_query_doc_nuri_string}> <{forwarder_nuri_string}>. }}"); + let ret = self + .process_sparql_update(&nuri, &sparql_update, &None, vec![]) + .await; + if let Err(e) = ret { + return Err(NgError::SparqlError(e)); + } + + // adding triples in forwarder doc : ng:social_query_id and ng:social_query_started + let sparql_update = format!("INSERT DATA {{ <> <{social_query_doc_nuri_string}> . + <> \"{}\"^^ . }}",DateTime::now()); + let ret = self + .process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string), vec![]) + .await; + if let Err(e) = ret { + log_err!("{sparql_update}"); + return Err(NgError::SparqlError(e)); + } + + let from_profiles = self.get_2_profiles()?; + + for (to_profile_nuri, to_inbox_nuri) in contacts { + + match self.social_query_dispatch( + &to_profile_nuri, + &to_inbox_nuri, + &forwarder_nuri, + &forwarder_id, + &from_profiles, + query_id, + &definition_commit_body_ref, + &blocks, + degree + ).await { + Ok(_) => {} + Err(e) => return Ok(AppResponse::error(e.to_string())), + } + } + + return Ok(AppResponse::ok()); + + // // FOR THE SAKE OF TESTING + // let to_profile_nuri = NuriV0::public_profile(&from_profile_id); + // let to_inbox_nuri: String = NuriV0::inbox(&from_inbox.to_pub()); + // let post = InboxPost::new_social_query_request( + // store.get_store_repo().clone(), + // from_inbox, + // forwarder_id, + // to_profile_nuri, + // to_inbox_nuri, + // None, + // *query_id, + // definition_commit_body_ref, + // blocks, + // degree, + // )?; + // return match self.client_request::<_,()>(post).await + // { + // Err(e) => Ok(AppResponse::error(e.to_string())), + // Ok(SoS::Stream(_)) => Ok(AppResponse::error(NgError::InvalidResponse.to_string())), + // Ok(SoS::Single(_)) => Ok(AppResponse::ok()), + // }; + } AppRequestCommandV0::Header => { if let Some(AppRequestPayload::V0(AppRequestPayloadV0::Header(doc_header))) = payload @@ -625,67 +852,17 @@ impl Verifier { } } AppRequestCommandV0::Create => { - if let Some(AppRequestPayload::V0(AppRequestPayloadV0::Create(doc_create))) = + return if let Some(AppRequestPayload::V0(AppRequestPayloadV0::Create(doc_create))) = payload { - //TODO: deal with doc_create.destination - - let user_id = self.user_id().clone(); - let user_priv_key = self.user_privkey().clone(); - let primary_class = doc_create.class.class().clone(); - let (_,_,store) = self.resolve_target(&nuri.target)?; - let repo_id = self - .new_repo_default( - &user_id, - &user_priv_key, - &store, - doc_create.class, - ) - .await?; - - let header_branch_id = { - let repo = self.get_repo(&repo_id, &store)?; - repo.header_branch().ok_or(NgError::BranchNotFound)?.id - }; - - // adding an AddRepo commit to the Store branch of store. - self.send_add_repo_to_store(&repo_id, &store) - .await?; - - // adding an ldp:contains triple to the store main branch - let overlay_id = store.outer_overlay(); - let nuri = NuriV0::repo_id(&repo_id); - let nuri_result = NuriV0::repo_graph_name(&repo_id, &overlay_id); - let store_nuri = NuriV0::from_store_repo(&store); - let store_nuri_string = NuriV0::repo_id(store.repo_id()); - let query = format!("INSERT DATA {{ <{store_nuri_string}> <{nuri}>. }}"); - - let ret = self - .process_sparql_update(&store_nuri, &query, &None, vec![]) - .await; - if let Err(e) = ret { - return Ok(AppResponse::error(e)); - } - - self.add_doc(&repo_id, &overlay_id)?; - - // adding the class triple to the header branch - let header_branch_nuri = format!("{nuri_result}:b:{}", header_branch_id); - let quad = Quad { - subject: NamedNode::new_unchecked(&nuri).into(), - predicate: NG_ONTOLOGY_CLASS_NAME.clone().into(), - object: Literal::new_simple_literal(primary_class).into(), - graph_name: NamedNode::new_unchecked(&header_branch_nuri).into(), - }; - let ret = self.prepare_sparql_update(vec![quad], vec![], vec![]).await; - if let Err(e) = ret { - return Ok(AppResponse::error(e.to_string())); + match self.doc_create(nuri, doc_create).await { + Err(NgError::SparqlError(e)) => Ok(AppResponse::error(e)), + Err(e) => Err(e), + Ok(nuri_result) => Ok(AppResponse::V0(AppResponseV0::Nuri(nuri_result))) } - - return Ok(AppResponse::V0(AppResponseV0::Nuri(nuri_result))); } else { - return Err(NgError::InvalidPayload); - } + Err(NgError::InvalidPayload) + }; } AppRequestCommandV0::Fetch(fetch) => match fetch { AppFetchContentV0::Header => { @@ -742,22 +919,9 @@ impl Verifier { base, }))) = payload { - //log_debug!("query={}", query); - let store = self.graph_dataset.as_ref().unwrap(); - let parsed = Query::parse(&sparql, base.as_deref()); - if parsed.is_err() { - return Ok(AppResponse::error(parsed.unwrap_err().to_string())); - } - let mut parsed = parsed.unwrap(); - let dataset = parsed.dataset_mut(); - //log_debug!("DEFAULTS {:?}", dataset.default_graph_graphs()); - if dataset.has_no_default_dataset() { - //log_info!("DEFAULT GRAPH AS UNION"); - dataset.set_default_graph_as_union(); - } - let results = store - .query(parsed, self.resolve_target_for_sparql(&nuri.target, false)?); + let results = self.sparql_query(&nuri, sparql, base).await; return Ok(match results { + Err(VerifierError::SparqlError(s)) => AppResponse::error(s), Err(e) => AppResponse::error(e.to_string()), Ok(qr) => { let res = Self::handle_query_results(qr); diff --git a/ng-verifier/src/rocksdb_user_storage.rs b/ng-verifier/src/rocksdb_user_storage.rs index e3f7aec5..7c12fec6 100644 --- a/ng-verifier/src/rocksdb_user_storage.rs +++ b/ng-verifier/src/rocksdb_user_storage.rs @@ -80,6 +80,10 @@ impl UserStorage for RocksDbUserStorage { RepoStorage::update_signer_cap(signer_cap, &self.user_storage) } + fn update_inbox_cap(&self, repo_id: &RepoId, overlay: &OverlayId, priv_key: &PrivKey) -> Result<(), StorageError> { + RepoStorage::update_inbox_cap(repo_id, overlay, priv_key, &self.user_storage) + } + fn update_certificate( &self, repo_id: &RepoId, @@ -92,6 +96,10 @@ impl UserStorage for RocksDbUserStorage { RepoStorage::open(repo_id, &self.user_storage)?.get_signer_cap() } + fn get_inbox_cap(&self, repo_id: &RepoId) -> Result { + RepoStorage::open(repo_id, &self.user_storage)?.get_inbox_cap() + } + fn update_branch_current_heads( &self, _repo_id: &RepoId, diff --git a/ng-verifier/src/site.rs b/ng-verifier/src/site.rs index 230de251..80801a3d 100644 --- a/ng-verifier/src/site.rs +++ b/ng-verifier/src/site.rs @@ -123,6 +123,11 @@ impl SiteV0 { let public_store_update: StoreUpdate = public_repo.store.as_ref().into(); signer_caps.push(public_repo.signer.to_owned().unwrap()); + // Creating the Inbox commit body about public store. + let public_store_inbox_commit_body = + CommitBody::V0(CommitBodyV0::AddInboxCap( + AddInboxCap::new_v0(public_repo.id, public_repo.store.outer_overlay(), public_repo.inbox.to_owned().unwrap()))); + let protected_repo = verifier .new_store_default( &site_pubkey, @@ -136,6 +141,11 @@ impl SiteV0 { let protected_store_update: StoreUpdate = protected_repo.store.as_ref().into(); signer_caps.push(protected_repo.signer.to_owned().unwrap()); + // Creating the Inbox commit body about protected store. + let protected_store_inbox_commit_body = + CommitBody::V0(CommitBodyV0::AddInboxCap( + AddInboxCap::new_v0(protected_repo.id, protected_repo.store.outer_overlay(),protected_repo.inbox.to_owned().unwrap()))); + let private_repo = verifier .new_store_default( &site_pubkey, @@ -179,16 +189,42 @@ impl SiteV0 { &private_repo.store, )?; - let mut current_head = protected_store_update_commit.reference().unwrap(); + // Creating the Inbox commit about public store. + let public_store_inbox_commit = Commit::new_with_body_acks_deps_and_save( + &user_priv_key, + &site_pubkey, + user_branch.id, + QuorumType::NoSigning, + vec![], + vec![protected_store_update_commit.reference().unwrap()], + public_store_inbox_commit_body, + &private_repo.store, + )?; + + // Creating the Inbox commit about protected store. + let protected_store_inbox_commit = Commit::new_with_body_acks_deps_and_save( + &user_priv_key, + &site_pubkey, + user_branch.id, + QuorumType::NoSigning, + vec![], + vec![public_store_inbox_commit.reference().unwrap()], + protected_store_inbox_commit_body, + &private_repo.store, + )?; + + let mut current_head = protected_store_inbox_commit.reference().unwrap(); let private_repo_id = private_repo.id; let private_store_repo = private_repo.store.get_store_repo().clone(); let private_repo_read_cap = private_repo.read_cap.to_owned().unwrap(); // Creating the AddSignerCap for each store - let mut commits = Vec::with_capacity(5); + let mut commits = Vec::with_capacity(7); commits.push((public_store_update_commit, vec![])); commits.push((protected_store_update_commit, vec![])); + commits.push((public_store_inbox_commit, vec![])); + commits.push((protected_store_inbox_commit, vec![])); for cap in signer_caps { let add_signer_cap_commit_body = CommitBody::V0(CommitBodyV0::AddSignerCap( diff --git a/ng-verifier/src/user_storage/repo.rs b/ng-verifier/src/user_storage/repo.rs index 06d4e869..3acbb25a 100644 --- a/ng-verifier/src/user_storage/repo.rs +++ b/ng-verifier/src/user_storage/repo.rs @@ -46,6 +46,7 @@ impl<'a> RepoStorage<'a> { const CHAT_BRANCH: u8 = b'c'; const DEFINITION: u8 = b'd'; const STORE_BRANCH: u8 = b'e'; + const CERTIFICATE: u8 = b'f'; const INHERIT: u8 = b'i'; const OVERLAY_BRANCH: u8 = b'l'; const MAIN_BRANCH: u8 = b'm'; @@ -57,10 +58,11 @@ impl<'a> RepoStorage<'a> { //const SIGNER_CAP_TOTAL: u8 = b't'; const USER_BRANCH: u8 = b'u'; const WRITE_CAP_SECRET: u8 = b'w'; - const CERTIFICATE: u8 = b'f'; + const INBOX_CAP: u8 = b'x'; - const ALL_PROPERTIES: [u8; 15] = [ + const ALL_PROPERTIES: [u8; 16] = [ Self::SIGNER_CAP, + Self::INBOX_CAP, //Self::SIGNER_CAP_PARTIAL, Self::CHAT_BRANCH, Self::DEFINITION, @@ -107,6 +109,7 @@ impl<'a> RepoStorage<'a> { repo.read_cap.as_ref().unwrap(), repo.write_cap.as_ref(), repo.signer.as_ref(), + repo.inbox.as_ref(), repo.store.get_store_repo(), &repo.repo_def, &repo.branches, @@ -156,6 +159,21 @@ impl<'a> RepoStorage<'a> { Ok(()) } + pub fn update_inbox_cap( + repo_id: &RepoId, + overlay: &OverlayId, + priv_key: &PrivKey, + storage: &'a dyn KCVStorage, + ) -> Result<(), StorageError> { + storage.write_transaction(&mut |tx| { + let id_ser = to_vec(repo_id)?; + let value = to_vec(priv_key)?; + tx.put(Self::PREFIX, &id_ser, Some(Self::INBOX_CAP), &value, &None)?; + Ok(()) + })?; + Ok(()) + } + pub fn update_certificate( id: &RepoId, certificate: &ObjectRef, @@ -186,11 +204,22 @@ impl<'a> RepoStorage<'a> { Ok(from_slice(&ser)?) } + pub fn get_inbox_cap(&self) -> Result { + let ser = self.storage.get( + Self::PREFIX, + &to_vec(&self.id).unwrap(), + Some(Self::INBOX_CAP), + &None, + )?; + Ok(from_slice(&ser)?) + } + pub fn create( id: &RepoId, read_cap: &ReadCap, write_cap: Option<&RepoWriteCapSecret>, signer_cap: Option<&SignerCap>, + inbox_cap: Option<&PrivKey>, store_repo: &StoreRepo, repo_def: &Repository, branches: &HashMap, @@ -246,6 +275,10 @@ impl<'a> RepoStorage<'a> { let value = to_vec(sc)?; tx.put(Self::PREFIX, &id_ser, Some(Self::SIGNER_CAP), &value, &None)?; } + if let Some(ic) = inbox_cap { + let value = to_vec(ic)?; + tx.put(Self::PREFIX, &id_ser, Some(Self::INBOX_CAP), &value, &None)?; + } for branch in branches.keys() { let mut branch_ser = to_vec(branch)?; let mut key = Vec::with_capacity(id_ser.len() + branch_ser.len()); @@ -326,6 +359,7 @@ impl<'a> RepoStorage<'a> { read_cap: prop(Self::READ_CAP, &props)?, write_cap: prop(Self::WRITE_CAP_SECRET, &props).ok(), signer: prop(Self::SIGNER_CAP, &props).ok(), + inbox: prop(Self::INBOX_CAP, &props).ok(), //TODO: members members: HashMap::new(), branches, diff --git a/ng-verifier/src/user_storage/storage.rs b/ng-verifier/src/user_storage/storage.rs index eaa91a52..25bac80a 100644 --- a/ng-verifier/src/user_storage/storage.rs +++ b/ng-verifier/src/user_storage/storage.rs @@ -42,6 +42,8 @@ pub trait UserStorage: Send + Sync { fn update_signer_cap(&self, signer_cap: &SignerCap) -> Result<(), StorageError>; + fn update_inbox_cap(&self, repo_id: &RepoId, overlay: &OverlayId, priv_key: &PrivKey) -> Result<(), StorageError>; + fn update_certificate( &self, repo_id: &RepoId, @@ -50,6 +52,8 @@ pub trait UserStorage: Send + Sync { fn get_signer_cap(&self, repo_id: &RepoId) -> Result; + fn get_inbox_cap(&self, repo_id: &RepoId) -> Result; + fn branch_add_file( &self, commit_id: ObjectId, @@ -86,6 +90,7 @@ pub(crate) struct InMemoryUserStorage { branch_files: RwLock>>, branch_discrete_state: RwLock>>, repo_signer_cap: RwLock>, + repo_inbox_cap: RwLock>, } impl InMemoryUserStorage { @@ -94,6 +99,7 @@ impl InMemoryUserStorage { branch_files: RwLock::new(HashMap::new()), branch_discrete_state: RwLock::new(HashMap::new()), repo_signer_cap: RwLock::new(HashMap::new()), + repo_inbox_cap: RwLock::new(HashMap::new()), } } } @@ -184,12 +190,23 @@ impl UserStorage for InMemoryUserStorage { lock.insert(signer_cap.repo, signer_cap.clone()); Ok(()) } - + fn get_signer_cap(&self, repo_id: &RepoId) -> Result { let mut lock = self.repo_signer_cap.write().unwrap(); Ok(lock.remove(repo_id).ok_or(StorageError::NotFound)?) } + fn update_inbox_cap(&self, repo_id: &RepoId, overlay: &OverlayId, priv_key: &PrivKey) -> Result<(), StorageError> { + let mut lock = self.repo_inbox_cap.write().unwrap(); + lock.insert(*repo_id, priv_key.clone()); + Ok(()) + } + + fn get_inbox_cap(&self, repo_id: &RepoId) -> Result { + let mut lock = self.repo_inbox_cap.write().unwrap(); + Ok(lock.remove(repo_id).ok_or(StorageError::NotFound)?) + } + fn update_branch_current_heads( &self, _repo_id: &RepoId, diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 498911b9..10ed56a6 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -98,6 +98,7 @@ pub struct Verifier { inner_to_outer: HashMap, pub(crate) outer: String, pub(crate) repos: HashMap, + inboxes: HashMap, // TODO: deal with collided repo_ids. self.repos should be a HashMap enum Collision {Yes, No(Repo)} // add a collided_repos: HashMap<(OverlayId, RepoId), Repo> // only use get_repo() everywhere in the code (always passing the overlay) so that collisions can be handled. @@ -509,6 +510,7 @@ impl Verifier { last_reservation: SystemTime::UNIX_EPOCH, stores: HashMap::new(), repos: HashMap::new(), + inboxes: HashMap::new(), topics: HashMap::new(), in_memory_outbox: vec![], inner_to_outer: HashMap::new(), @@ -1280,6 +1282,27 @@ impl Verifier { // ); // discarding error. } + + // registering inbox for protected and public store. (FIXME: this should be done instead in the 1st connection during wallet creation) + let remote = self.connected_broker.connected_or_err()?; + let mut done = false; + for (_,store) in self.stores.iter() { + if store.id() == self.protected_store_id() || store.id() == self.public_store_id() { + let repo = self.get_repo( store.id(), &store.get_store_repo())?; + let inbox = repo.inbox.to_owned().unwrap(); + // sending InboxRegister + let msg = InboxRegister::new(inbox, store.outer_overlay())?; + broker + .request::(&Some(user), &remote, msg) + .await?; + if !done { + done = true; + } else { + break; + } + } + } + Ok(()) } @@ -1543,10 +1566,51 @@ impl Verifier { Ok(()) } + pub async fn inbox(&mut self, msg: InboxMsg, from_queue: bool) { + + log_info!("RECEIVED INBOX MSG {:?}", msg); + + match self.inboxes.get(&msg.body.to_inbox) { + Some(repo_id) => { + match self.repos.get(repo_id) { + Some(repo) => { + if let Some(privkey) = &repo.inbox { + match msg.get_content(privkey) { + Ok(content) => { + log_info!("received msg content {:?}", content); + let res = self.process_inbox(msg, content).await; + if let Err(e) = res { + log_err!("Error during process_inbox {e}"); + } + }, + Err(e) => { + log_err!("cannot unseal inbox msg {e}"); + } + } + } + }, + None => {} + } + }, + None => {} + } + + if from_queue && self.connected_broker.is_some(){ + log_info!("try to pop one more inbox msg"); + // try to pop inbox msg + let connected_broker = self.connected_broker.clone(); + let broker = BROKER.read().await; + let user = self.user_id().clone(); + let _ = broker + .send_client_event(&Some(user), &connected_broker.into(), ClientEvent::InboxPopRequest) + .await; + } + } + pub async fn deliver(&mut self, event: Event, overlay: OverlayId) { - let event_str = event.to_string(); + //let event_str = event.to_string(); if let Err(e) = self.deliver_(event, overlay).await { - log_err!("DELIVERY ERROR {} {}", e, event_str); + log_err!("DELIVERY ERROR {}", e); } } @@ -1624,6 +1688,7 @@ impl Verifier { CommitBodyV0::SyncSignature(a) => a.verify(commit, self, branch_id, repo_id, store), CommitBodyV0::AddBranch(a) => a.verify(commit, self, branch_id, repo_id, store), CommitBodyV0::StoreUpdate(a) => a.verify(commit, self, branch_id, repo_id, store), + CommitBodyV0::AddInboxCap(a) => a.verify(commit, self, branch_id, repo_id, store), CommitBodyV0::AddSignerCap(a) => a.verify(commit, self, branch_id, repo_id, store), CommitBodyV0::AddFile(a) => a.verify(commit, self, branch_id, repo_id, store), CommitBodyV0::AddRepo(a) => a.verify(commit, self, branch_id, repo_id, store), @@ -1723,6 +1788,26 @@ impl Verifier { Ok(()) } + pub(crate) fn update_inbox_cap_v0( + &mut self, + inbox_cap: &AddInboxCapV0, + ) -> Result<(), VerifierError> { + let storage = match self.repos.get_mut(&inbox_cap.repo_id) { + Some(repo) => { + repo.inbox = Some(inbox_cap.priv_key.clone()); + log_info!("INBOX for {} : {}", inbox_cap.repo_id.to_string(), inbox_cap.priv_key.to_pub().to_string()); + self.inboxes.insert(inbox_cap.priv_key.to_pub(), repo.id); + self.user_storage_if_persistent() + } + None => self.user_storage(), + }; + if let Some(user_storage) = storage { + user_storage.update_inbox_cap(&inbox_cap.repo_id, &inbox_cap.overlay, &inbox_cap.priv_key)?; + } + + Ok(()) + } + pub(crate) fn update_signer_cap( &mut self, signer_cap: &SignerCap, @@ -2599,6 +2684,7 @@ impl Verifier { last_seq_num: 0, stores: HashMap::new(), repos: HashMap::new(), + inboxes: HashMap::new(), topics: HashMap::new(), in_memory_outbox: vec![], inner_to_outer: HashMap::new(), @@ -2660,6 +2746,10 @@ impl Verifier { fn add_repo_(&mut self, repo: Repo) -> &Repo { //self.populate_topics(&repo); let _ = self.add_doc(&repo.id, &repo.store.overlay_id); + if repo.inbox.is_some() { + log_info!("INBOX for {} : {}", repo.id.to_string(), repo.inbox.as_ref().unwrap().to_pub().to_string()); + _ = self.inboxes.insert(repo.inbox.as_ref().unwrap().to_pub(), repo.id); + } let repo_ref = self.repos.entry(repo.id).or_insert(repo); repo_ref } @@ -2731,7 +2821,7 @@ impl Verifier { }; let overlay_id = store_repo.overlay_id_for_storage_purpose(); let block_storage = self.get_arc_block_storage()?; - let store = self.stores.entry(overlay_id).or_insert_with(|| { + let store: &mut Arc = self.stores.entry(overlay_id).or_insert_with(|| { let store_readcap = ReadCap::nil(); // temporarily set the store_overlay_branch_readcap to an objectRef that has an empty id, and a key = to the repo_write_cap_secret let store_overlay_branch_readcap = @@ -2744,7 +2834,7 @@ impl Verifier { ); Arc::new(store) }); - let (repo, proto_events) = Arc::clone(store).create_repo_with_keys( + let (mut repo, proto_events) = Arc::clone(store).create_repo_with_keys( creator, creator_priv_key, priv_key, @@ -2753,6 +2843,9 @@ impl Verifier { None, private, )?; + if !private { + repo.inbox = Some(PrivKey::random_ed()); + } let repo = self.complete_site_store(store_repo, repo)?; self.populate_topics(&repo); self.new_events_with_repo(proto_events, &repo).await?; @@ -2782,7 +2875,7 @@ impl Verifier { // send AddSignerCap to User branch of private store let add_signer_cap_commit_body = CommitBody::V0(CommitBodyV0::AddSignerCap( AddSignerCap::V0(AddSignerCapV0 { - cap: repo.signer.as_ref().unwrap().clone(), + cap: repo.signer.to_owned().unwrap(), metadata: vec![], }), )); diff --git a/ngcli/src/main.rs b/ngcli/src/main.rs index 3e75693a..9e6da5c4 100644 --- a/ngcli/src/main.rs +++ b/ngcli/src/main.rs @@ -750,7 +750,7 @@ async fn main_inner() -> Result<(), NgcliError> { peer_pubk, config_v0.peer_id, config_v0.user.as_ref().unwrap().to_pub(), - config_v0.user.as_ref().unwrap().clone(), + config_v0.user.to_owned().unwrap(), BindAddress { port: config_v0.port, ip: (&config_v0.ip).into(),