Compare commits

...

6 Commits

  1. 2
      app/nextgraph/src-tauri/tauri.conf.json
  2. 24
      engine/broker/src/rocksdb_server_storage.rs
  3. 29
      engine/broker/src/server_broker.rs
  4. 2
      engine/broker/src/server_storage/core/account.rs
  5. 6
      engine/broker/src/server_storage/core/inbox.rs
  6. 15
      engine/net/src/actors/client/client_event.rs
  7. 4
      engine/net/src/actors/client/inbox_post.rs
  8. 11
      engine/net/src/actors/client/inbox_register.rs
  9. 8
      engine/net/src/broker.rs
  10. 20
      engine/net/src/bsps.rs
  11. 24
      engine/net/src/connection.rs
  12. 8
      engine/net/src/server_broker.rs
  13. 160
      engine/net/src/types.rs
  14. 23
      engine/oxigraph/src/oxigraph/sparql/dataset.rs
  15. 9
      engine/oxigraph/src/oxigraph/storage/mod.rs
  16. 23
      engine/repo/src/commit.rs
  17. 8
      engine/repo/src/file.rs
  18. 2
      engine/repo/src/kcv_storage.rs
  19. 3
      engine/repo/src/store.rs
  20. 11
      engine/repo/src/types.rs
  21. 8
      engine/repo/src/utils.rs
  22. 9
      engine/storage-rocksdb/src/kcv_storage.rs
  23. 2
      engine/verifier/src/commits/transaction.rs
  24. 425
      engine/verifier/src/inbox_processor.rs
  25. 1
      engine/verifier/src/orm/add_remove_triples.rs
  26. 307
      engine/verifier/src/orm/mod.rs
  27. 5
      engine/verifier/src/orm/types.rs
  28. 4
      engine/verifier/src/orm/validation.rs
  29. 7
      engine/verifier/src/rocksdb_user_storage.rs
  30. 14
      engine/verifier/src/site.rs
  31. 20
      engine/verifier/src/types.rs
  32. 14
      engine/verifier/src/user_storage/storage.rs
  33. 28
      engine/wallet/src/permissions.rs
  34. 15
      engine/wallet/src/types.rs
  35. 68
      infra/ngnet/src/main.rs

@ -14,7 +14,7 @@
"windows": [
{
"title": "NextGraph",
"width": 800,
"width": 1024,
"height": 600
}
],

@ -278,14 +278,17 @@ impl RocksDbServerStorage {
inv.del()?;
Ok(())
}
pub(crate) fn get_inboxes_for_readers(&self, user: &UserId) -> Result<HashSet<(PubKey, OverlayId)>,StorageError> {
pub(crate) fn get_inboxes_for_readers(
&self,
user: &UserId,
) -> Result<HashSet<(PubKey, OverlayId)>, StorageError> {
AccountStorage::load_inboxes(user, &self.core_storage)
}
pub(crate) fn take_first_msg_from_inbox(
&self,
inbox: &PubKey,
overlay: &OverlayId
overlay: &OverlayId,
) -> Result<InboxMsg, StorageError> {
InboxStorage::take_first_msg(inbox, overlay, &self.core_storage)
}
@ -293,21 +296,24 @@ impl RocksDbServerStorage {
pub(crate) fn get_readers_for_inbox(
&self,
inbox: &PubKey,
overlay: &OverlayId
overlay: &OverlayId,
) -> Result<HashSet<UserId>, StorageError> {
InboxStorage::load_readers(inbox, overlay, &self.core_storage)
}
pub(crate) fn register_inbox_reader(&self, user_id: UserId, inbox_id: PubKey, overlay: OverlayId) -> Result<(), StorageError> {
pub(crate) fn register_inbox_reader(
&self,
user_id: UserId,
inbox_id: PubKey,
overlay: OverlayId,
) -> Result<(), StorageError> {
InboxStorage::register_reader(&inbox_id, &overlay, &user_id, &self.core_storage)?;
AccountStorage::add_inbox(&user_id, inbox_id, overlay, &self.core_storage)
}
pub(crate) fn enqueue_inbox_msg(
&self,
msg: &InboxMsg
) -> Result<(), StorageError> {
InboxStorage::open(&msg.body.to_inbox, &msg.body.to_overlay, &self.core_storage)?.enqueue_msg(msg)
pub(crate) fn enqueue_inbox_msg(&self, msg: &InboxMsg) -> Result<(), StorageError> {
InboxStorage::open(&msg.body.to_inbox, &msg.body.to_overlay, &self.core_storage)?
.enqueue_msg(msg)
}
pub(crate) fn get_repo_pin_status(

@ -823,39 +823,40 @@ impl IServerBroker for ServerBroker {
}
async fn inbox_post(&self, post: InboxPost) -> Result<(), ServerError> {
// TODO: deal with Inbox that is not local to the broker (use Core protocol to dispatch it)
let users = self.storage.get_readers_for_inbox(&post.msg.body.to_inbox, &post.msg.body.to_overlay)?;
let users = self
.storage
.get_readers_for_inbox(&post.msg.body.to_inbox, &post.msg.body.to_overlay)?;
if users.is_empty() {
self.storage.enqueue_inbox_msg(&post.msg)?;
return Ok(())
return Ok(());
}
let broker = BROKER.read().await;
let not_dispatched = broker
.dispatch_inbox_msg(&users, post.msg)
.await?;
let not_dispatched = broker.dispatch_inbox_msg(&users, post.msg).await?;
if let Some(msg) = not_dispatched {
self.storage.enqueue_inbox_msg(&msg)?;
}
Ok(())
}
fn inbox_register(&self, user_id: UserId, registration: InboxRegister) -> Result<(), ServerError> {
self.storage.register_inbox_reader(user_id, registration.inbox_id, registration.overlay)?;
fn inbox_register(
&self,
user_id: UserId,
registration: InboxRegister,
) -> Result<(), ServerError> {
self.storage
.register_inbox_reader(user_id, registration.inbox_id, registration.overlay)?;
Ok(())
}
async fn inbox_pop_for_user(&self, user: UserId ) -> Result<InboxMsg, ServerError> {
async fn inbox_pop_for_user(&self, user: UserId) -> Result<InboxMsg, ServerError> {
let inboxes = self.storage.get_inboxes_for_readers(&user)?;
for (inbox,overlay) in inboxes {
for (inbox, overlay) in inboxes {
match self.storage.take_first_msg_from_inbox(&inbox, &overlay) {
Ok(msg) => {
return Ok(msg)
},
Ok(msg) => return Ok(msg),
Err(_) => {}
}
}

@ -82,7 +82,7 @@ impl<'a> AccountStorage<'a> {
storage: &'a dyn KCVStorage,
) -> Result<(), StorageError> {
let mut opening = Self::new(user, storage);
Self::INBOXES.add(&mut opening, &(inbox,overlay))
Self::INBOXES.add(&mut opening, &(inbox, overlay))
}
pub fn create(

@ -102,11 +102,11 @@ impl<'a> InboxStorage<'a> {
}
pub fn enqueue_msg(&mut self, msg: &InboxMsg) -> Result<(), StorageError> {
let (sec,nano) = now_precise_timestamp();
let (sec, nano) = now_precise_timestamp();
let mut hasher = DefaultHasher::new();
msg.body.hash(&mut hasher);
let key = (sec,nano, hasher.finish());
Self::MSGS.add(self, &key,msg)
let key = (sec, nano, hasher.finish());
Self::MSGS.add(self, &key, msg)
}
pub fn create(

@ -31,7 +31,6 @@ impl ClientEvent {
impl TryFrom<ProtocolMessage> for ClientEvent {
type Error = ProtocolError;
fn try_from(msg: ProtocolMessage) -> Result<Self, Self::Error> {
if let ProtocolMessage::ClientMessage(ClientMessage::V0(ClientMessageV0 {
content: ClientMessageContentV0::ClientEvent(e),
..
@ -50,7 +49,7 @@ impl From<ClientEvent> for ProtocolMessage {
ProtocolMessage::ClientMessage(ClientMessage::V0(ClientMessageV0 {
content: ClientMessageContentV0::ClientEvent(e),
overlay: OverlayId::nil(),
padding: vec![]
padding: vec![],
}))
}
}
@ -68,10 +67,9 @@ impl EActor for Actor<'_, ClientEvent, ()> {
match req {
ClientEvent::InboxPopRequest => {
let sb = { BROKER.read().await.get_server_broker()? };
let user = {fsm.lock().await.user_id()?};
let res: Result<InboxMsg, ServerError> = {
sb.read().await.inbox_pop_for_user(user).await
};
let user = { fsm.lock().await.user_id()? };
let res: Result<InboxMsg, ServerError> =
{ sb.read().await.inbox_pop_for_user(user).await };
if let Ok(msg) = res {
let _ = fsm
@ -81,7 +79,10 @@ impl EActor for Actor<'_, ClientEvent, ()> {
ClientMessageV0 {
overlay: msg.body.to_overlay.clone(),
padding: vec![],
content: ClientMessageContentV0::InboxReceive{msg, from_queue: true},
content: ClientMessageContentV0::InboxReceive {
msg,
from_queue: true,
},
},
)))
.await;

@ -61,9 +61,7 @@ impl EActor for Actor<'_, InboxPost, ()> {
) -> Result<(), ProtocolError> {
let req = InboxPost::try_from(msg)?;
let sb = { BROKER.read().await.get_server_broker()? };
let res: Result<(), ServerError> = sb
.read()
.await.inbox_post(req).await;
let res: Result<(), ServerError> = sb.read().await.inbox_post(req).await;
fsm.lock()
.await

@ -66,9 +66,12 @@ impl EActor for Actor<'_, InboxRegister, ()> {
if verify(&req.challenge, req.sig, req.inbox_id).is_err() {
fsm.lock()
.await
.send_in_reply_to(Result::<(), _>::Err(ServerError::InvalidSignature).into(), self.id())
.send_in_reply_to(
Result::<(), _>::Err(ServerError::InvalidSignature).into(),
self.id(),
)
.await?;
return Ok(())
return Ok(());
}
let sb = { BROKER.read().await.get_server_broker()? };
@ -78,9 +81,7 @@ impl EActor for Actor<'_, InboxRegister, ()> {
fsm.user_id()?
};
let res: Result<(), ServerError> = sb
.read()
.await.inbox_register(user_id, req);
let res: Result<(), ServerError> = sb.read().await.inbox_register(user_id, req);
fsm.lock()
.await

@ -124,7 +124,7 @@ pub enum LocalBrokerMessage {
Disconnected {
user_id: UserId,
},
Inbox{
Inbox {
user_id: UserId,
msg: InboxMsg,
from_queue: bool,
@ -1218,7 +1218,6 @@ impl Broker {
users: &HashSet<UserId>,
msg: InboxMsg,
) -> Result<Option<InboxMsg>, ServerError> {
for user in users.iter() {
if let Some(peers) = self.users_peers.get(user) {
for peer in peers.iter() {
@ -1236,7 +1235,10 @@ impl Broker {
ClientMessageV0 {
overlay: msg.body.to_overlay.clone(),
padding: vec![],
content: ClientMessageContentV0::InboxReceive{msg, from_queue: false},
content: ClientMessageContentV0::InboxReceive {
msg,
from_queue: false,
},
},
)))
.await;

@ -1,6 +1,6 @@
use time::{Month,Date};
use std::collections::HashMap;
use lazy_static::lazy_static;
use std::collections::HashMap;
use time::{Date, Month};
pub struct BSPDetail<'a> {
pub domain: &'a str,
@ -21,17 +21,21 @@ lazy_static! {
pub static ref BSP_DETAILS: HashMap<&'static str, BSPDetail<'static>> = {
let mut d = HashMap::new();
d.insert("https://nextgraph.eu", BSPDetail {
d.insert(
"https://nextgraph.eu",
BSPDetail {
domain: "nextgraph.eu",
country: "de",
sysadmin: "team@nextgraph.org",
owned: false,
since: Date::from_calendar_date(2024, Month::September,2).unwrap(),
since: Date::from_calendar_date(2024, Month::September, 2).unwrap(),
has_free: true,
has_paid: false,
official: true,
description: "First official Broker Service Provider from NextGraph.org. Based in Europe."
});
description:
"First official Broker Service Provider from NextGraph.org. Based in Europe.",
},
);
assert!(d.insert("https://nextgraph.one", BSPDetail {
domain: "nextgraph.one",
@ -47,7 +51,5 @@ lazy_static! {
d
};
pub static ref BSP_ORIGINS: Vec<&'static str> = {
BSP_DETAILS.keys().cloned().collect()
};
pub static ref BSP_ORIGINS: Vec<&'static str> = { BSP_DETAILS.keys().cloned().collect() };
}

@ -1046,8 +1046,11 @@ impl NoiseFSM {
})
.await;
return Ok(StepReply::NONE);
},
ClientMessageContentV0::InboxReceive{msg, from_queue} => {
}
ClientMessageContentV0::InboxReceive {
msg,
from_queue,
} => {
let _ = BROKER
.read()
.await
@ -1055,12 +1058,12 @@ impl NoiseFSM {
.send(LocalBrokerMessage::Inbox {
msg,
user_id: self.user_id()?,
from_queue
from_queue,
})
.await;
return Ok(StepReply::NONE);
}
_ => {},
_ => {}
},
}
}
@ -1326,9 +1329,18 @@ impl ConnectionBase {
pub async fn send_client_event<
A: Into<ProtocolMessage> + std::fmt::Debug + Sync + Send + 'static,
>(&self, msg: A) -> Result<(), NgError> {
>(
&self,
msg: A,
) -> Result<(), NgError> {
let proto_msg: ProtocolMessage = msg.into();
self.fsm.as_ref().unwrap().lock().await.send(proto_msg).await?;
self.fsm
.as_ref()
.unwrap()
.lock()
.await
.send(proto_msg)
.await?;
Ok(())
}

@ -41,8 +41,12 @@ pub trait IServerBroker: Send + Sync {
rendezvous: SymKey,
) -> Receiver<Result<ExportedWallet, ServerError>>;
async fn inbox_post(&self, post: InboxPost) -> Result<(), ServerError>;
fn inbox_register(&self, user_id: UserId, registration: InboxRegister) -> Result<(), ServerError>;
async fn inbox_pop_for_user(&self, user: UserId ) -> Result<InboxMsg, ServerError>;
fn inbox_register(
&self,
user_id: UserId,
registration: InboxRegister,
) -> Result<(), ServerError>;
async fn inbox_pop_for_user(&self, user: UserId) -> Result<InboxMsg, ServerError>;
fn get_path_users(&self) -> PathBuf;
fn get_block_storage(&self) -> Arc<std::sync::RwLock<dyn BlockStorage + Send + Sync>>;
fn put_block(&self, overlay_id: &OverlayId, block: Block) -> Result<(), ServerError>;

@ -27,12 +27,15 @@ use ng_repo::errors::*;
use ng_repo::log::*;
use ng_repo::store::Store;
use ng_repo::types::*;
use ng_repo::utils::{random_key, sign, verify, decode_digest, decode_key, decode_sym_key, decode_priv_key, decode_overlayid};
use ng_repo::utils::{
decode_digest, decode_key, decode_overlayid, decode_priv_key, decode_sym_key, random_key, sign,
verify,
};
use crate::app_protocol::*;
use crate::utils::{
get_domain_without_port_443, is_ipv4_private, is_ipv6_private, is_private_ip, is_public_ip,
is_public_ipv4, is_public_ipv6, decode_locator
decode_locator, get_domain_without_port_443, is_ipv4_private, is_ipv6_private, is_private_ip,
is_public_ip, is_public_ipv4, is_public_ipv6,
};
use crate::WS_PORT_ALTERNATE;
use crate::{actor::EActor, actors::admin::*, actors::*};
@ -483,12 +486,7 @@ impl BrokerServerV0 {
pub async fn get_url_for_ngnet(&self, ipv4: bool, ipv6: bool) -> Option<String> {
match &self.server_type {
BrokerServerTypeV0::Public(addrs) => {
Self::ng_app_bootstrap_url_with_first_ipv6_or_ipv4(
ipv4,
ipv6,
addrs,
self.peer_id,
)
Self::ng_app_bootstrap_url_with_first_ipv6_or_ipv4(ipv4, ipv6, addrs, self.peer_id)
}
BrokerServerTypeV0::BoxPublicDyn(addrs) => {
// let resp = reqwest::get(api_dyn_peer_url(&self.peer_id)).await;
@ -656,19 +654,21 @@ impl BrokerServerV0 {
}
pub fn to_iframe_msg(&self) -> BootstrapIframeMsg {
match &self.server_type {
BrokerServerTypeV0::Domain(domain) => BootstrapIframeMsg::domain(domain.clone()),
BrokerServerTypeV0::Localhost(port) => BootstrapIframeMsg::local(*port, self.peer_id),
BrokerServerTypeV0::BoxPrivate(addrs) => BootstrapIframeMsg::private(addrs.to_vec(), self.peer_id),
BrokerServerTypeV0::Public(_) | BrokerServerTypeV0::BoxPublicDyn(_) => BootstrapIframeMsg::ngbox(),
BrokerServerTypeV0::BoxPrivate(addrs) => {
BootstrapIframeMsg::private(addrs.to_vec(), self.peer_id)
}
BrokerServerTypeV0::Public(_) | BrokerServerTypeV0::BoxPublicDyn(_) => {
BootstrapIframeMsg::ngbox()
}
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct BootstrapIframeMsg {
pub peer_id: Option<String>,
pub private: Option<Vec<BindAddress>>,
@ -678,17 +678,16 @@ pub struct BootstrapIframeMsg {
pub domain: Option<String>,
pub localhost: Option<u16>,
}
impl BootstrapIframeMsg {
fn new() -> Self {
Self {
peer_id:None,
private:None,
ngbox:None,
domain:None,
localhost:None
peer_id: None,
private: None,
ngbox: None,
domain: None,
localhost: None,
}
}
@ -759,7 +758,10 @@ impl BootstrapContentV0 {
}
pub fn to_iframe_msgs(&self) -> Vec<BootstrapIframeMsg> {
self.servers.iter().map(|server| server.to_iframe_msg()).collect()
self.servers
.iter()
.map(|server| server.to_iframe_msg())
.collect()
}
}
@ -3639,24 +3641,23 @@ pub struct InboxRegister {
// TODO: obtain challenge from Broker
pub challenge: [u8; 32],
// signature of challenge by inbox privkey
pub sig: Sig
pub sig: Sig,
}
impl InboxRegister {
pub fn new(inbox: PrivKey, overlay: OverlayId) -> Result<Self,NgError> {
pub fn new(inbox: PrivKey, overlay: OverlayId) -> Result<Self, NgError> {
let challenge = random_key();
let inbox_id = inbox.to_pub();
let sig = sign(&inbox,&inbox_id, &challenge)?;
let sig = sign(&inbox, &inbox_id, &challenge)?;
Ok(Self {
inbox_id,
overlay,
challenge,
sig
sig,
})
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct InboxPost {
pub msg: InboxMsg,
@ -3668,27 +3669,30 @@ impl InboxPost {
pub fn new(
to_overlay: OverlayId,
to_inbox: PubKey,
from: Option<(OverlayId,PrivKey)>,
content:&InboxMsgContent,
from: Option<(OverlayId, PrivKey)>,
content: &InboxMsgContent,
blocks: Vec<Block>,
to_broker: Option<Locator>
) -> Result<Self,NgError>
{
to_broker: Option<Locator>,
) -> Result<Self, NgError> {
Ok(Self {
msg: InboxMsg::new(to_overlay,to_inbox,from,content,blocks)?,
to_broker
msg: InboxMsg::new(to_overlay, to_inbox, from, content, blocks)?,
to_broker,
})
}
pub fn new_social_query_response(
to_overlay: OverlayId,
to_inbox: PubKey,
from: Option<(OverlayId,PrivKey)>,
from: Option<(OverlayId, PrivKey)>,
query_id: RepoId,
forwarder_id: RepoId,
content: SocialQueryResponseContent
) -> Result<Self,NgError> {
let content = InboxMsgContent::SocialQuery(SocialQuery::Response(SocialQueryResponse { query_id, forwarder_id, content }));
content: SocialQueryResponseContent,
) -> Result<Self, NgError> {
let content = InboxMsgContent::SocialQuery(SocialQuery::Response(SocialQueryResponse {
query_id,
forwarder_id,
content,
}));
Self::new(to_overlay, to_inbox, from, &content, vec![], None)
}
@ -3697,14 +3701,20 @@ impl InboxPost {
request: &SocialQueryRequest,
content: SocialQueryResponseContent,
inbox_privkey: PrivKey,
) -> Result<Self,NgError> {
) -> Result<Self, NgError> {
let to_overlay = msg.from_overlay.ok_or(NgError::InvalidArgument)?;
let to_inbox = msg.from_inbox.ok_or(NgError::InvalidArgument)?;
if msg.to_inbox != inbox_privkey.to_pub() { return Err(NgError::InvalidArgument); }
if msg.to_inbox != inbox_privkey.to_pub() {
return Err(NgError::InvalidArgument);
}
let from = Some((msg.to_overlay, inbox_privkey));
let query_id = request.query_id;
let forwarder_id = request.forwarder_id;
let content = InboxMsgContent::SocialQuery(SocialQuery::Response(SocialQueryResponse { query_id, forwarder_id, content }));
let content = InboxMsgContent::SocialQuery(SocialQuery::Response(SocialQueryResponse {
query_id,
forwarder_id,
content,
}));
Self::new(to_overlay, to_inbox, from, &content, vec![], None)
}
@ -3722,12 +3732,9 @@ impl InboxPost {
blocks: Vec<Block>,
degree: u16,
) -> Result<Self, NgError> {
// processing to_profile_nuri
let c = RE_PROFILE.captures(&to_profile_nuri);
if c.is_some()
&& c.as_ref().unwrap().get(1).is_some()
{
if c.is_some() && c.as_ref().unwrap().get(1).is_some() {
let cap = c.unwrap();
let o = cap.get(1).unwrap().as_str();
let to_profile_id = decode_key(o)?;
@ -3735,14 +3742,13 @@ impl InboxPost {
// processing to_inbox_nuri
let c = RE_INBOX.captures(&to_inbox_nuri);
if c.is_some()
&& c.as_ref().unwrap().get(1).is_some()
{
if c.is_some() && c.as_ref().unwrap().get(1).is_some() {
let cap = c.unwrap();
let d = cap.get(1).unwrap().as_str();
let to_inbox = decode_key(d)?;
let from_overlay = from_profile_store_repo.outer_overlay();
let content = InboxMsgContent::SocialQuery(SocialQuery::Request(SocialQueryRequest{
let content =
InboxMsgContent::SocialQuery(SocialQuery::Request(SocialQueryRequest {
query_id,
forwarder_id,
from_profile_store_repo,
@ -3753,10 +3759,10 @@ impl InboxPost {
return Ok(InboxPost::new(
to_overlay,
to_inbox,
Some((from_overlay,from_inbox)),
Some((from_overlay, from_inbox)),
&content,
blocks,
to_broker
to_broker,
)?);
}
}
@ -3771,27 +3777,29 @@ impl InboxPost {
to_broker: Option<Locator>,
with_readcap: bool,
name: String,
email: Option<String>
email: Option<String>,
) -> Result<Self, NgError> {
let from_overlay = from_profile_store_repo.outer_overlay();
let content = InboxMsgContent::ContactDetails(ContactDetails{
let content = InboxMsgContent::ContactDetails(ContactDetails {
profile: from_profile_store_repo,
read_cap: if with_readcap {unimplemented!();} else {None},
read_cap: if with_readcap {
unimplemented!();
} else {
None
},
name,
email
email,
});
return Ok(InboxPost::new(
to_overlay,
to_inbox,
Some((from_overlay,from_inbox)),
Some((from_overlay, from_inbox)),
&content,
vec![],
to_broker
to_broker,
)?);
}
}
/// Request to publish an event in pubsub
@ -4255,7 +4263,6 @@ pub enum InboxMsgContent {
/// InboxMsgBody
#[derive(Clone, Debug, Serialize, Deserialize, Hash, PartialEq)]
pub struct InboxMsgBody {
pub to_overlay: OverlayId,
pub to_inbox: PubKey,
@ -4270,7 +4277,6 @@ pub struct InboxMsgBody {
/// InboxMsg
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct InboxMsg {
pub body: InboxMsgBody,
/// optional signature by sender (from_inbox pubkey), over body
@ -4284,10 +4290,10 @@ impl InboxMsg {
pub fn new(
to_overlay: OverlayId,
to_inbox: PubKey,
from: Option<(OverlayId,PrivKey)>,
content:&InboxMsgContent,
blocks: Vec<Block>
) -> Result<Self,NgError> {
from: Option<(OverlayId, PrivKey)>,
content: &InboxMsgContent,
blocks: Vec<Block>,
) -> Result<Self, NgError> {
let ser = serde_bare::to_vec(content).unwrap();
let mut rng = crypto_box::aead::OsRng {};
let msg = crypto_box::seal(&mut rng, &to_inbox.to_dh_slice().into(), &ser)
@ -4295,27 +4301,18 @@ impl InboxMsg {
let body = InboxMsgBody {
to_overlay,
to_inbox,
from_overlay: from.as_ref().map(|(o,_)|o.clone()),
from_inbox: from.as_ref().map(|(_,i)|i.to_pub()),
msg
from_overlay: from.as_ref().map(|(o, _)| o.clone()),
from_inbox: from.as_ref().map(|(_, i)| i.to_pub()),
msg,
};
let sig = match from {
Some((_,inbox)) => {
Some((_, inbox)) => {
let ser = serde_bare::to_vec(&body).unwrap();
Some(sign(
&inbox,
body.from_inbox.as_ref().unwrap(),
&ser,
)?)},
None=>None
};
Ok(
Self {
body,
sig,
blocks
Some(sign(&inbox, body.from_inbox.as_ref().unwrap(), &ser)?)
}
)
None => None,
};
Ok(Self { body, sig, blocks })
}
pub fn get_content(&self, inbox_sk: &PrivKey) -> Result<InboxMsgContent, NgError> {
@ -4340,7 +4337,7 @@ pub enum ClientMessageContentV0 {
ClientResponse(ClientResponse),
ForwardedEvent(Event),
ForwardedBlock(Block),
InboxReceive{ msg: InboxMsg, from_queue: bool },
InboxReceive { msg: InboxMsg, from_queue: bool },
ClientEvent(ClientEvent),
}
impl ClientMessageContentV0 {
@ -5262,7 +5259,6 @@ impl NgQRCode {
}
}
// TODO: PermaLinks and InboxPost (and ExtRequests)
#[cfg(test)]

@ -80,12 +80,16 @@ impl DatasetView {
})
});
}
query_dataset
.available_named_graphs()
.map(|graphs| for nob in graphs { match nob {
NamedOrBlankNode::NamedNode(nn) => { res.encode_term(NamedNodeRef::new_unchecked(nn.as_str())); }
,_=>{}
} });
query_dataset.available_named_graphs().map(|graphs| {
for nob in graphs {
match nob {
NamedOrBlankNode::NamedNode(nn) => {
res.encode_term(NamedNodeRef::new_unchecked(nn.as_str()));
}
_ => {}
}
}
});
res
}
@ -98,9 +102,10 @@ impl DatasetView {
self.reader
.parse_graph_name(&graph_name_string, Some(*iri_id))
}
_ => Err(CorruptionError::msg(
format!("Invalid graph_name (not a NamedNode) in parse_graph_name {:?}", graph_name),
)
_ => Err(CorruptionError::msg(format!(
"Invalid graph_name (not a NamedNode) in parse_graph_name {:?}",
graph_name
))
.into()),
}
}

@ -808,11 +808,10 @@ impl StorageReader {
// TODO: check that all the commits are from the same branch
// TODO: if commits are exactly like current heads of branch, set at_current_heads = true (or if it is the main branch, use MatchBy::Repos)
MatchBy::Commits {
heads: HashSet::from_iter(
commits
.into_iter()
.map(|c| { let s = format!("{DID_PREFIX}{c}:v:{overlay}"); StrHash::new(&s) }),
),
heads: HashSet::from_iter(commits.into_iter().map(|c| {
let s = format!("{DID_PREFIX}{c}:v:{overlay}");
StrHash::new(&s)
})),
at_current_heads: false,
original_graph_name,
}

@ -312,14 +312,14 @@ impl Commit {
pub fn collect_block_ids(
commit_ref: ObjectRef,
store: &Store,
with_body: bool
with_body: bool,
) -> Result<Vec<BlockId>, CommitLoadError> {
let mut block_ids : Vec<BlockId>;
let mut block_ids: Vec<BlockId>;
let (id, key) = (commit_ref.id, commit_ref.key);
match Object::load(id, Some(key.clone()), store) {
Err(ObjectParseError::MissingHeaderBlocks((_, missing))) => {
return Err(CommitLoadError::MissingBlocks(missing));
},
}
Ok(obj) => {
let content = obj
.content()
@ -333,10 +333,15 @@ impl Commit {
if with_body {
let content = commit.content_v0();
let (id, key) = (content.body.id, content.body.key.clone());
let obj = Object::load(id.clone(), Some(key.clone()), store).map_err(|e| match e {
ObjectParseError::MissingBlocks(missing) => CommitLoadError::MissingBlocks(missing),
let obj =
Object::load(id.clone(), Some(key.clone()), store).map_err(
|e| match e {
ObjectParseError::MissingBlocks(missing) => {
CommitLoadError::MissingBlocks(missing)
}
_ => CommitLoadError::ObjectParseError,
})?;
},
)?;
let content = obj
.content()
.map_err(|_e| CommitLoadError::ObjectParseError)?;
@ -1586,7 +1591,11 @@ impl fmt::Display for CommitBody {
RemoveSignerCap(RemoveSignerCap),
WalletUpdate(WalletUpdate),
StoreUpdate(StoreUpdate), */
_ => write!(f, "!!!! CommitBody Display not implemented for {:?}", v0.type_id()),
_ => write!(
f,
"!!!! CommitBody Display not implemented for {:?}",
v0.type_id()
),
}
}
}

@ -1491,12 +1491,8 @@ mod test {
log_debug!("{}", file);
let file = File::open(
file.id().unwrap(),
file.key().to_owned().unwrap(),
store,
)
.expect("open");
let file =
File::open(file.id().unwrap(), file.key().to_owned().unwrap(), store).expect("open");
// this only works because we chose a big block size (1MB) so the small JPG file fits in one block.
// if not, we would have to call read repeatedly and append the results into a buffer, in order to get the full file

@ -1064,7 +1064,7 @@ pub trait ReadTransaction {
key_prefix: Vec<u8>,
suffix: Option<u8>,
family: &Option<String>,
) -> Result<(Vec<u8>,Vec<u8>), StorageError>;
) -> Result<(Vec<u8>, Vec<u8>), StorageError>;
/// Check if a specific value exists for a property from the store.
fn has_property_value(

@ -298,7 +298,8 @@ impl Store {
"data:container"
} else {
"social:profile"
}.to_string()
}
.to_string(),
));
}

@ -16,11 +16,11 @@ use std::cmp::Ordering;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
use ng_threshold_crypto::serde_impl::SerdeSecret;
use ng_threshold_crypto::SignatureShare;
use once_cell::sync::OnceCell;
use sbbf_rs_safe::Filter;
use serde::{Deserialize, Serialize};
use ng_threshold_crypto::serde_impl::SerdeSecret;
use ng_threshold_crypto::SignatureShare;
use zeroize::{Zeroize, ZeroizeOnDrop};
use crate::errors::NgError;
@ -615,7 +615,7 @@ pub enum OverlayId {
impl Default for OverlayId {
fn default() -> Self {
OverlayId::Outer([0;32])
OverlayId::Outer([0; 32])
}
}
@ -1966,7 +1966,6 @@ pub enum RemoveLink {
V0(RemoveLinkV0),
}
/// Adds an Inbox Capability (privkey) into the user branch, so that a user can share with all its device.
///
/// DEPS to the previous AddInboxCap commit(s) if it is an update. in this case, repo_id should match
@ -1992,11 +1991,11 @@ pub enum AddInboxCap {
impl AddInboxCap {
pub fn new_v0(repo_id: RepoId, overlay: OverlayId, priv_key: PrivKey) -> Self {
Self::V0(AddInboxCapV0{
Self::V0(AddInboxCapV0 {
repo_id,
overlay,
priv_key,
metadata: vec![]
metadata: vec![],
})
}
}

@ -207,11 +207,9 @@ pub fn now_timestamp() -> Timestamp {
.unwrap()
}
pub fn now_precise_timestamp() -> (u64,u32) {
let dur = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap();
(dur.as_secs(),dur.subsec_nanos())
pub fn now_precise_timestamp() -> (u64, u32) {
let dur = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
(dur.as_secs(), dur.subsec_nanos())
}
/// returns a new NextGraph Timestamp equivalent to the duration after now.

@ -81,7 +81,7 @@ impl<'a> ReadTransaction for RocksdbTransaction<'a> {
key_prefix: Vec<u8>,
suffix: Option<u8>,
family: &Option<String>,
) -> Result<(Vec<u8>,Vec<u8>), StorageError> {
) -> Result<(Vec<u8>, Vec<u8>), StorageError> {
let property_start =
RocksDbKCVStorage::calc_key_start(prefix, key_size, &key_prefix, &suffix);
let iter = self.get_iterator(&property_start, &family)?;
@ -236,7 +236,8 @@ impl<'a> WriteTransaction for RocksdbTransaction<'a> {
suffix: Option<u8>,
family: &Option<String>,
) -> Result<Vec<u8>, StorageError> {
let (key,value) = self.get_first_key_value(prefix, key_size, key_prefix, suffix, family)?;
let (key, value) =
self.get_first_key_value(prefix, key_size, key_prefix, suffix, family)?;
let key_without_prefix = key[1..].to_vec();
self.del(prefix, &key_without_prefix, suffix, family)?;
Ok(value)
@ -378,7 +379,7 @@ impl ReadTransaction for RocksDbKCVStorage {
key_prefix: Vec<u8>,
suffix: Option<u8>,
family: &Option<String>,
) -> Result<(Vec<u8>,Vec<u8>), StorageError> {
) -> Result<(Vec<u8>, Vec<u8>), StorageError> {
let property_start =
RocksDbKCVStorage::calc_key_start(prefix, key_size, &key_prefix, &suffix);
let iter = self.get_iterator(&property_start, &family)?;
@ -679,7 +680,7 @@ impl RocksDbKCVStorage {
key_prefix: Vec<u8>,
suffix: Option<u8>,
mut iter: DBIteratorWithThreadMode<'_, impl ng_rocksdb::DBAccess>,
) -> Result<(Vec<u8>,Vec<u8>), StorageError> {
) -> Result<(Vec<u8>, Vec<u8>), StorageError> {
if key_prefix.len() > key_size {
return Err(StorageError::InvalidValue);
}

@ -778,7 +778,7 @@ impl Verifier {
.await;
let graph_nuri =
NuriV0::repo_graph_name(&update.repo_id, &update.overlay_id);
self.orm_update(
self.orm_backend_update(
session_id,
update.repo_id.clone(),
update.overlay_id,

@ -17,23 +17,28 @@ use ng_net::broker::BROKER;
use ng_oxigraph::oxigraph::sparql::QueryResults;
use ng_oxigraph::oxrdf::{NamedNode, Term, Triple};
use ng_oxigraph::oxsdatatypes::DateTime;
use ng_repo::types::{Block, ObjectRef, OverlayId, PrivKey, ReadCap, RepoId, StoreRepo, StoreRepoV0};
use ng_repo::{errors::*, store::Store, types::Commit};
use ng_repo::log::*;
use ng_repo::types::{
Block, ObjectRef, OverlayId, PrivKey, ReadCap, RepoId, StoreRepo, StoreRepoV0,
};
use ng_repo::{errors::*, store::Store, types::Commit};
use ng_net::types::{InboxMsg, InboxMsgContent, InboxPost, SocialQuery, SocialQueryResponse, SocialQueryResponseContent};
use ng_net::app_protocol::*;
use ng_net::types::{
InboxMsg, InboxMsgContent, InboxPost, SocialQuery, SocialQueryResponse,
SocialQueryResponseContent,
};
use crate::verifier::*;
impl Verifier {
pub(crate) async fn post_to_inbox(&self, post: InboxPost) -> Result<(), VerifierError> {
//log_info!("post_to_inbox {:?}",post);
let res = match self.client_request::<_,()>(post).await
{
let res = match self.client_request::<_, ()>(post).await {
Err(e) => Err(VerifierError::InboxError(e.to_string())),
Ok(SoS::Stream(_)) => Err(VerifierError::InboxError(NgError::InvalidResponse.to_string())),
Ok(SoS::Stream(_)) => Err(VerifierError::InboxError(
NgError::InvalidResponse.to_string(),
)),
Ok(SoS::Single(_)) => Ok(()),
};
//log_info!("res {:?}",res);
@ -48,37 +53,54 @@ impl Verifier {
from_inbox_nuri_string: &String,
) -> Result<(String, NuriV0), VerifierError> {
// creating the ForwardedSocialQuery in the private store
let forwarder = self.doc_create_with_store_repo(
"Graph".to_string(), "social:query:forwarded".to_string(),
"store".to_string(), None // meaning in private store
).await?;
let forwarder = self
.doc_create_with_store_repo(
"Graph".to_string(),
"social:query:forwarded".to_string(),
"store".to_string(),
None, // meaning in private store
)
.await?;
let forwarder_nuri = NuriV0::new_from_repo_graph(&forwarder)?;
let forwarder_id = forwarder_nuri.target.repo_id().clone();
let forwarder_nuri_string = NuriV0::repo_id(&forwarder_id);
// adding triples in forwarder doc : ng:social_query_id
let sparql_update = format!(" PREFIX ng: <did:ng:x:ng#>
let sparql_update = format!(
" PREFIX ng: <did:ng:x:ng#>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
INSERT DATA {{ <> ng:social_query_id <{social_query_doc_nuri_string}>.
<> ng:social_query_forwarder <{from_forwarder_nuri_string}>.
<> ng:social_query_from_inbox <{from_inbox_nuri_string}>.
<> ng:social_query_from_profile <{from_profile_nuri_string}>.
<> ng:social_query_started \"{}\"^^xsd:dateTime . }}",DateTime::now());
<> ng:social_query_started \"{}\"^^xsd:dateTime . }}",
DateTime::now()
);
let ret = self
.process_sparql_update(&forwarder_nuri, &sparql_update, &Some(forwarder_nuri_string.clone()), vec![],0)
.process_sparql_update(
&forwarder_nuri,
&sparql_update,
&Some(forwarder_nuri_string.clone()),
vec![],
0,
)
.await;
if let Err(e) = ret {
return Err(VerifierError::SparqlError(e));
}
Ok((forwarder_nuri_string,forwarder_nuri))
Ok((forwarder_nuri_string, forwarder_nuri))
}
pub(crate) async fn mark_social_query_forwarder(&mut self, forwarder_nuri_string: &String, forwarder_nuri: &NuriV0, predicate: String) -> Result<(), VerifierError> {
pub(crate) async fn mark_social_query_forwarder(
&mut self,
forwarder_nuri_string: &String,
forwarder_nuri: &NuriV0,
predicate: String,
) -> Result<(), VerifierError> {
// adding triples in forwarder doc : ng:social_query_id
let sparql_update = format!("INSERT DATA {{ <{forwarder_nuri_string}> <did:ng:x:ng#{predicate}> \"{}\"^^<http://www.w3.org/2001/XMLSchema#dateTime> . }}",DateTime::now());
let ret = self
.process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![],0)
.process_sparql_update(forwarder_nuri, &sparql_update, &None, vec![], 0)
.await;
if let Err(e) = ret {
return Err(VerifierError::SparqlError(e));
@ -86,46 +108,67 @@ impl Verifier {
Ok(())
}
pub(crate) fn get_privkey_of_inbox(&self, this_overlay: &OverlayId) -> Result<PrivKey, VerifierError> {
pub(crate) fn get_privkey_of_inbox(
&self,
this_overlay: &OverlayId,
) -> Result<PrivKey, VerifierError> {
let store = self.get_store_by_overlay_id(this_overlay)?;
let repo = self.repos.get(&store.id()).ok_or(NgError::RepoNotFound)?;
let from_inbox = repo.inbox.to_owned().ok_or(NgError::InboxNotFound)?;
Ok(from_inbox)
}
fn get_profile_replying_to(&self, from_profile: &String) -> Result<(OverlayId, PrivKey) ,NgError> {
fn get_profile_replying_to(
&self,
from_profile: &String,
) -> Result<(OverlayId, PrivKey), NgError> {
let from_profile_id = if from_profile.starts_with("did:ng:b") {
self.config.protected_store_id.unwrap()
} else {
self.config.public_store_id.unwrap()
};
let repo = self.repos.get(&from_profile_id).ok_or(NgError::RepoNotFound)?;
let repo = self
.repos
.get(&from_profile_id)
.ok_or(NgError::RepoNotFound)?;
let inbox = repo.inbox.to_owned().ok_or(NgError::InboxNotFound)?;
let overlay = repo.store.get_store_repo().outer_overlay();
Ok( (overlay, inbox.clone()) )
Ok((overlay, inbox.clone()))
}
pub(crate) fn get_2_profiles(&self) -> Result<(
pub(crate) fn get_2_profiles(
&self,
) -> Result<
(
(StoreRepo, PrivKey), // public
(StoreRepo, PrivKey) // protected
) ,NgError> {
(StoreRepo, PrivKey), // protected
),
NgError,
> {
let protected_store_id = self.config.protected_store_id.unwrap();
let protected_repo = self.repos.get(&protected_store_id).ok_or(NgError::RepoNotFound)?;
let protected_inbox = protected_repo.inbox.to_owned().ok_or(NgError::InboxNotFound)?;
let protected_repo = self
.repos
.get(&protected_store_id)
.ok_or(NgError::RepoNotFound)?;
let protected_inbox = protected_repo
.inbox
.to_owned()
.ok_or(NgError::InboxNotFound)?;
let protected_store_repo = protected_repo.store.get_store_repo();
let public_store_id = self.config.public_store_id.unwrap();
let public_repo = self.repos.get(&public_store_id).ok_or(NgError::RepoNotFound)?;
let public_repo = self
.repos
.get(&public_store_id)
.ok_or(NgError::RepoNotFound)?;
let public_inbox = public_repo.inbox.to_owned().ok_or(NgError::InboxNotFound)?;
let public_store_repo = public_repo.store.get_store_repo();
Ok((
(*public_store_repo, public_inbox.clone()),
(*protected_store_repo, protected_inbox.clone())
(*protected_store_repo, protected_inbox.clone()),
))
}
@ -137,23 +180,24 @@ impl Verifier {
forwarder_id: &RepoId,
from_profiles: &(
(StoreRepo, PrivKey), // public
(StoreRepo, PrivKey) // protected
(StoreRepo, PrivKey), // protected
),
query_id: &RepoId,
definition_commit_body_ref: &ObjectRef,
blocks: &Vec<Block>,
degree: u16,
) -> Result<(), VerifierError> {
// first add an entry in the local forwarded social query, to monitor progress
let sparql_update = format!("
let sparql_update = format!(
"
PREFIX ng: <did:ng:x:ng#>
INSERT DATA {{
<did:ng:_> ng:social_query_forwarded_to_profile <{to_profile_nuri}> .
<did:ng:_> ng:social_query_forwarded_to_inbox <{to_inbox_nuri}> .
}}");
}}"
);
let ret = self
.process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![],0)
.process_sparql_update(&forwarder_nuri, &sparql_update, &None, vec![], 0)
.await;
if let Err(e) = ret {
return Err(VerifierError::SparqlError(e));
@ -177,7 +221,8 @@ impl Verifier {
definition_commit_body_ref.clone(),
blocks.to_vec(),
degree,
)?).await?;
)?)
.await?;
Ok(())
}
@ -187,10 +232,8 @@ impl Verifier {
msg: &InboxMsg,
content: InboxMsgContent,
) -> Result<(), VerifierError> {
match content {
InboxMsgContent::SocialQuery(SocialQuery::Request(req)) => {
let profile_id_nuri = NuriV0::from_store_repo_string(&req.from_profile_store_repo);
//TODO: check that msg.body.from_overlay matches with req.from_profile_store_repo
@ -221,19 +264,24 @@ impl Verifier {
}
// otherwise, create the forwarder
let (forwarder_nuri_string, forwarder_nuri) = self.create_social_query_forwarder(
let (forwarder_nuri_string, forwarder_nuri) = self
.create_social_query_forwarder(
&social_query_doc_nuri_string,
&NuriV0::repo_id(&req.forwarder_id),
&NuriV0::from_store_repo_string(&req.from_profile_store_repo),
&NuriV0::inbox(&msg.body.from_inbox.unwrap())
).await?;
&NuriV0::inbox(&msg.body.from_inbox.unwrap()),
)
.await?;
let temp_mini_block_storage = Store::new_temp_in_mem();
for block in msg.blocks.iter() {
let _id = temp_mini_block_storage.put(block)?;
}
let commit = Commit::load(req.definition_commit_body_ref.clone(),
&temp_mini_block_storage, true)
let commit = Commit::load(
req.definition_commit_body_ref.clone(),
&temp_mini_block_storage,
true,
)
.map_err(|e| {
//log_err!("err : {:?}", e);
e
@ -244,28 +292,36 @@ impl Verifier {
let mut sparql: Option<String> = None;
for triple in triples {
if triple.predicate.as_str() == "did:ng:x:ng#social_query_sparql" {
sparql = Some(
match triple.object {
sparql = Some(match triple.object {
Term::Literal(l) => l.value().into(),
_ => return Err(VerifierError::InvalidSocialQuery)
_ => return Err(VerifierError::InvalidSocialQuery),
});
break;
}
}
//TODO: in case of errors here below, mark the forwarder as ng:social_query_error
if sparql.is_none() { return Err(VerifierError::InvalidSocialQuery); }
if sparql.is_none() {
return Err(VerifierError::InvalidSocialQuery);
}
//log_info!("{}",sparql.as_ref().unwrap());
let res = self.sparql_query(&NuriV0::new_entire_user_site(), sparql.unwrap(), None).await?;
let res = self
.sparql_query(&NuriV0::new_entire_user_site(), sparql.unwrap(), None)
.await?;
let results = match res {
QueryResults::Boolean(_) | QueryResults::Solutions(_) => return Err(VerifierError::NotImplemented),
QueryResults::Boolean(_) | QueryResults::Solutions(_) => {
return Err(VerifierError::NotImplemented)
}
QueryResults::Graph(triples) => {
let mut results = vec![];
for t in triples {
match t {
Err(e) => { log_err!("{}",e.to_string()); return Err(VerifierError::SparqlError(e.to_string()))},
Err(e) => {
log_err!("{}", e.to_string());
return Err(VerifierError::SparqlError(e.to_string()));
}
Ok(triple) => results.push(triple),
}
}
@ -277,12 +333,13 @@ impl Verifier {
// Do we have local results matching the request's query? If yes, we send them back to the forwarder right away
if !results.is_empty() {
let content = SocialQueryResponseContent::Graph(serde_bare::to_vec(&results).unwrap());
let content =
SocialQueryResponseContent::Graph(serde_bare::to_vec(&results).unwrap());
let post = InboxPost::new_social_query_response_replying_to(
&msg.body,
&req,
content,
reply_with_inbox.clone()
reply_with_inbox.clone(),
)?;
self.post_to_inbox(post).await?;
}
@ -290,18 +347,22 @@ impl Verifier {
// only fan out if we have contacts (that match the grant selected by current user)
// and if degree is > to 1 or equal to zero
if req.degree == 1 {
// ending here.
self.mark_social_query_forwarder(&forwarder_nuri_string, &forwarder_nuri, "social_query_ended".to_string()).await?;
self.mark_social_query_forwarder(
&forwarder_nuri_string,
&forwarder_nuri,
"social_query_ended".to_string(),
)
.await?;
let post = InboxPost::new_social_query_response_replying_to(
&msg.body,
&req,
SocialQueryResponseContent::EndOfReplies,
reply_with_inbox.clone()
reply_with_inbox.clone(),
)?;
self.post_to_inbox(post).await?;
return Ok(())
return Ok(());
}
// fan out forwarded social queries to all contacts (except the one we received it from)
@ -315,12 +376,16 @@ impl Verifier {
FILTER ( bound(?profile_id) && NOT EXISTS {{ ?c ng:site <{profile_id_nuri}> }} && NOT EXISTS {{ ?c ng:protected <{profile_id_nuri}> }} )
}}");
//log_info!("{sparql}");
let sols = match self.sparql_query(
&NuriV0::new_entire_user_site(),
sparql, None).await?
let sols = match self
.sparql_query(&NuriV0::new_entire_user_site(), sparql, None)
.await?
{
QueryResults::Solutions(sols) => { sols }
_ => return Err(VerifierError::SparqlError(NgError::InvalidResponse.to_string())),
QueryResults::Solutions(sols) => sols,
_ => {
return Err(VerifierError::SparqlError(
NgError::InvalidResponse.to_string(),
))
}
};
let degree = if req.degree == 0 { 0 } else { req.degree - 1 };
@ -350,8 +415,9 @@ impl Verifier {
&req.query_id,
&req.definition_commit_body_ref,
&msg.blocks,
degree
).await?;
degree,
)
.await?;
}
}
}
@ -360,23 +426,26 @@ impl Verifier {
// if not found any contact, we stop here
//log_info!("found contact {found_contact}");
if !found_contact {
self.mark_social_query_forwarder(&forwarder_nuri_string, &forwarder_nuri, "social_query_ended".to_string()).await?;
self.mark_social_query_forwarder(
&forwarder_nuri_string,
&forwarder_nuri,
"social_query_ended".to_string(),
)
.await?;
let post = InboxPost::new_social_query_response_replying_to(
&msg.body,
&req,
SocialQueryResponseContent::EndOfReplies,
reply_with_inbox
reply_with_inbox,
)?;
self.post_to_inbox(post).await?;
}
}
InboxMsgContent::SocialQuery(SocialQuery::Response(response)) => {
if msg.body.from_inbox.is_none() {
// TODO log error
// we do nothing as this is invalid msg. it must have a from.
return Err(VerifierError::InvalidSocialQuery)
return Err(VerifierError::InvalidSocialQuery);
}
let forwarder_nuri = NuriV0::new_repo_target_from_id(&response.forwarder_id);
@ -411,23 +480,53 @@ impl Verifier {
// self.open_for_target(&forwarder_nuri.target, false).await?;
// }
self.open_branch_(&private_store_id, &user_branch_id,
false, &broker, &user, &self.connected_broker.clone(), true ).await?;
self.open_branch_(
&private_store_id,
&user_branch_id,
false,
&broker,
&user,
&self.connected_broker.clone(),
true,
)
.await?;
let main_branch_id = {
self.repos.get(&response.forwarder_id).unwrap().main_branch().unwrap().id
self.repos
.get(&response.forwarder_id)
.unwrap()
.main_branch()
.unwrap()
.id
};
self.open_branch_(&response.forwarder_id, &main_branch_id,
false, &broker, &user, &self.connected_broker.clone(), true ).await?;
self.open_branch_(
&response.forwarder_id,
&main_branch_id,
false,
&broker,
&user,
&self.connected_broker.clone(),
true,
)
.await?;
}
let forwarder_nuri_string = NuriV0::repo_id(&response.forwarder_id);
// checking that we do have a running ForwardedSocialQuery, and that it didnt end, otherwise it must be spam.
match self.sparql_query( &forwarder_nuri, format!("ASK {{ <> <did:ng:x:ng#social_query_id> <{}> }} ",
NuriV0::repo_id(&response.query_id)),Some(forwarder_nuri_string.clone())).await? {
match self
.sparql_query(
&forwarder_nuri,
format!(
"ASK {{ <> <did:ng:x:ng#social_query_id> <{}> }} ",
NuriV0::repo_id(&response.query_id)
),
Some(forwarder_nuri_string.clone()),
)
.await?
{
QueryResults::Boolean(true) => {}
_ => { return Err(VerifierError::InvalidSocialQuery) }
_ => return Err(VerifierError::InvalidSocialQuery),
}
let (forwarded_from_profile, forwarded_from_inbox, from_forwarder) = match self.sparql_query(
&forwarder_nuri,
@ -480,7 +579,8 @@ impl Verifier {
};
// searching for the tokenized commit that added this forwarding.
let spar = format!("PREFIX ng: <did:ng:x:ng#>
let spar = format!(
"PREFIX ng: <did:ng:x:ng#>
SELECT ?token WHERE
{{ ?token ng:social_query_forwarded_to_inbox <{}> .
MINUS {{ ?token ng:social_query_ended ?t . }} .
@ -488,15 +588,22 @@ impl Verifier {
NuriV0::inbox(&msg.body.from_inbox.unwrap())
);
//log_info!("{spar}");
let token = match self.sparql_query(
let token = match self
.sparql_query(
&forwarder_nuri,
//<> ng:social_query_id <{}> NuriV0::inbox(&msg.body.from_inbox.unwrap()),
spar,
Some(NuriV0::repo_id(&response.forwarder_id))).await?
Some(NuriV0::repo_id(&response.forwarder_id)),
)
.await?
{
QueryResults::Solutions(mut sols) => {
match sols.next() {
None => { return Err(VerifierError::SparqlError("Token not found".to_string())); }
None => {
return Err(VerifierError::SparqlError(
"Token not found".to_string(),
));
}
Some(Err(e)) => {
// TODO log error
// we do nothing as we couldn't find the token
@ -508,43 +615,59 @@ impl Verifier {
} else {
// TODO log error
// we do nothing as we couldn't find the token
return Err(VerifierError::SparqlError(NgError::InvalidResponse.to_string()));
return Err(VerifierError::SparqlError(
NgError::InvalidResponse.to_string(),
));
}
}
}
}
_ => return Err(VerifierError::SparqlError(NgError::InvalidResponse.to_string())),
_ => {
return Err(VerifierError::SparqlError(
NgError::InvalidResponse.to_string(),
))
}
};
//log_info!("token = {token}");
let at_origin = forwarded_from_profile.is_none() || forwarded_from_inbox.is_none() || from_forwarder.is_none();
let at_origin = forwarded_from_profile.is_none()
|| forwarded_from_inbox.is_none()
|| from_forwarder.is_none();
match response.content {
SocialQueryResponseContent::AlreadyRequested
| SocialQueryResponseContent::EndOfReplies
| SocialQueryResponseContent::Error(_) => {
// ending here this forwarding.
self.mark_social_query_forwarder(&token, &forwarder_nuri, "social_query_ended".to_string()).await?;
self.mark_social_query_forwarder(
&token,
&forwarder_nuri,
"social_query_ended".to_string(),
)
.await?;
// TODO record error
// if we are at the end of the whole ForwardedSocialQuery (no more pending responses)
// we send EndOfReplies upstream, and mark as ended.
let the_end = match self.sparql_query(
let the_end = match self
.sparql_query(
&forwarder_nuri,
format!("PREFIX ng: <did:ng:x:ng#>
format!(
"PREFIX ng: <did:ng:x:ng#>
SELECT ?token WHERE
{{ ?token ng:social_query_forwarded_to_profile ?p .
MINUS {{ ?token ng:social_query_ended ?t . }}
}}"),
None).await?
}}"
),
None,
)
.await?
{
QueryResults::Solutions(mut sols) => {
match sols.next() {
QueryResults::Solutions(mut sols) => match sols.next() {
None => true,
_ => false,
}
}
},
_ => {
// TODO: log error
false
@ -552,32 +675,42 @@ impl Verifier {
};
if the_end {
// marking the end
self.mark_social_query_forwarder(&NuriV0::repo_id(&response.forwarder_id), &forwarder_nuri, "social_query_ended".to_string()).await?;
self.mark_social_query_forwarder(
&NuriV0::repo_id(&response.forwarder_id),
&forwarder_nuri,
"social_query_ended".to_string(),
)
.await?;
if !at_origin {
// getting the privkey of the inbox because we will need it here below to send responses.
let from = self.get_profile_replying_to(forwarded_from_profile.as_ref().unwrap())?;
let from = self.get_profile_replying_to(
forwarded_from_profile.as_ref().unwrap(),
)?;
// sending EndOfReplies upstream
let to_overlay = NuriV0::from_profile_into_overlay_id(forwarded_from_profile.as_ref().unwrap())?;
let to_inbox_id = NuriV0::from_inbox_into_id(forwarded_from_inbox.as_ref().unwrap())?;
let from_forwarder = NuriV0::from_repo_nuri_to_id(from_forwarder.as_ref().unwrap())?;
let to_overlay = NuriV0::from_profile_into_overlay_id(
forwarded_from_profile.as_ref().unwrap(),
)?;
let to_inbox_id = NuriV0::from_inbox_into_id(
forwarded_from_inbox.as_ref().unwrap(),
)?;
let from_forwarder =
NuriV0::from_repo_nuri_to_id(from_forwarder.as_ref().unwrap())?;
let post = InboxPost::new_social_query_response(
to_overlay,
to_inbox_id,
Some(from),
response.query_id,
from_forwarder,
SocialQueryResponseContent::EndOfReplies
SocialQueryResponseContent::EndOfReplies,
)?;
self.post_to_inbox(post).await?;
}
}
}
SocialQueryResponseContent::Graph(graph) => {
if at_origin {
// insert the triples in the query document
let triples: Vec<Triple> = serde_bare::from_slice(&graph)?;
@ -589,40 +722,59 @@ impl Verifier {
// log_info!("{}",t.to_string());
// }
let overlay_id = self.repos.get(&response.query_id).ok_or(VerifierError::RepoNotFound)?.store.outer_overlay();
let overlay_id = self
.repos
.get(&response.query_id)
.ok_or(VerifierError::RepoNotFound)?
.store
.outer_overlay();
let nuri_ov = NuriV0::repo_graph_name(&response.query_id, &overlay_id);
let graph_name = NamedNode::new_unchecked(&nuri_ov);
let quads = triples.into_iter().map(|t| t.in_graph(graph_name.clone()) ).collect();
let _ = self.prepare_sparql_update(quads, vec![], self.get_peer_id_for_skolem(), 0).await?;
let quads = triples
.into_iter()
.map(|t| t.in_graph(graph_name.clone()))
.collect();
let _ = self
.prepare_sparql_update(
quads,
vec![],
self.get_peer_id_for_skolem(),
0,
)
.await?;
} else {
// we forward upstream
// getting the privkey of the inbox because we will need it here below to send responses.
let from = self.get_profile_replying_to(forwarded_from_profile.as_ref().unwrap())?;
let from = self.get_profile_replying_to(
forwarded_from_profile.as_ref().unwrap(),
)?;
let to_overlay = NuriV0::from_profile_into_overlay_id(forwarded_from_profile.as_ref().unwrap())?;
let to_inbox_id = NuriV0::from_inbox_into_id(forwarded_from_inbox.as_ref().unwrap())?;
let from_forwarder = NuriV0::from_repo_nuri_to_id(from_forwarder.as_ref().unwrap())?;
let to_overlay = NuriV0::from_profile_into_overlay_id(
forwarded_from_profile.as_ref().unwrap(),
)?;
let to_inbox_id =
NuriV0::from_inbox_into_id(forwarded_from_inbox.as_ref().unwrap())?;
let from_forwarder =
NuriV0::from_repo_nuri_to_id(from_forwarder.as_ref().unwrap())?;
let post = InboxPost::new_social_query_response(
to_overlay,
to_inbox_id,
Some(from),
response.query_id,
from_forwarder,
SocialQueryResponseContent::Graph(graph)
SocialQueryResponseContent::Graph(graph),
)?;
self.post_to_inbox(post).await?;
}
}
SocialQueryResponseContent::QueryResult(_) | SocialQueryResponseContent::False | SocialQueryResponseContent::True => {
SocialQueryResponseContent::QueryResult(_)
| SocialQueryResponseContent::False
| SocialQueryResponseContent::True => {
// not implemented yet
return Err(VerifierError::NotImplemented)
return Err(VerifierError::NotImplemented);
}
}
}
InboxMsgContent::ContactDetails(details) => {
if msg.body.from_inbox.is_none() {
@ -633,7 +785,11 @@ impl Verifier {
let inbox_nuri_string: String = NuriV0::inbox(&msg.body.from_inbox.unwrap());
let profile_nuri_string: String = NuriV0::from_store_repo_string(&details.profile);
let a_or_b = if details.profile.is_public() { "site" } else { "protected" };
let a_or_b = if details.profile.is_public() {
"site"
} else {
"protected"
};
// checking if this contact has already been added
match self.sparql_query(
@ -646,35 +802,50 @@ impl Verifier {
_ => {}
}
let contact = self.doc_create_with_store_repo(
"Graph".to_string(), "social:contact".to_string(),
"store".to_string(), None // meaning in private store
).await?;
let contact = self
.doc_create_with_store_repo(
"Graph".to_string(),
"social:contact".to_string(),
"store".to_string(),
None, // meaning in private store
)
.await?;
let contact_nuri = NuriV0::new_from_repo_graph(&contact)?;
let contact_id = contact_nuri.target.repo_id().clone();
let contact_nuri_string = NuriV0::repo_id(&contact_id);
let has_email = details.email.map_or("".to_string(), |email| format!("<> vcard:hasEmail \"{email}\"."));
let has_email = details.email.map_or("".to_string(), |email| {
format!("<> vcard:hasEmail \"{email}\".")
});
// adding triples in contact doc
let sparql_update = format!(" PREFIX ng: <did:ng:x:ng#>
let sparql_update = format!(
" PREFIX ng: <did:ng:x:ng#>
PREFIX vcard: <http://www.w3.org/2006/vcard/ns#>
INSERT DATA {{ <> ng:{a_or_b} <{profile_nuri_string}>.
<> ng:{a_or_b}_inbox <{inbox_nuri_string}>.
<> a vcard:Individual .
<> vcard:fn \"{}\".
{has_email} }}", details.name);
{has_email} }}",
details.name
);
let ret = self
.process_sparql_update(&contact_nuri, &sparql_update, &Some(contact_nuri_string), vec![],0)
.process_sparql_update(
&contact_nuri,
&sparql_update,
&Some(contact_nuri_string),
vec![],
0,
)
.await;
if let Err(e) = ret {
return Err(VerifierError::SparqlError(e));
}
self.update_header(&contact_nuri.target, Some(details.name), None).await?;
self.update_header(&contact_nuri.target, Some(details.name), None)
.await?;
}
_ => return Err(VerifierError::NotImplemented)
_ => return Err(VerifierError::NotImplemented),
}
Ok(())
}

@ -44,6 +44,7 @@ pub fn add_remove_triples(
tracked_predicates: HashMap::new(),
parents: HashMap::new(),
valid: OrmTrackedSubjectValidity::Pending,
prev_valid: OrmTrackedSubjectValidity::Pending,
subject_iri: subject_iri.to_string(),
shape: shape.clone(),
}))

@ -257,24 +257,9 @@ impl Verifier {
log_debug!("not applying triples again for subject {subject_iri}");
}
let validity = {
let tracked_subject_opt = orm_subscription
.tracked_subjects
.get(*subject_iri)
.and_then(|m| m.get(&shape.iri));
let Some(tracked_subject) = tracked_subject_opt else {
continue;
}; // skip if missing
tracked_subject.read().unwrap().valid.clone()
};
// Validate the subject.
let need_eval = Self::update_subject_validity(
change,
&shape,
&mut orm_subscription,
validity,
);
let need_eval =
Self::update_subject_validity(change, &shape, &mut orm_subscription);
// We add the need_eval to be processed next after loop.
// Filter out subjects already in the validation stack to prevent double evaluation.
@ -391,14 +376,10 @@ impl Verifier {
shape: Option<&ShapeIri>,
session_id: Option<&u64>,
) -> Result<(UnboundedSender<AppResponse>, &OrmSubscription), VerifierError> {
let subs = self
.orm_subscriptions
.get_mut(nuri)
.unwrap();
let subs = self.orm_subscriptions.get_mut(nuri).unwrap();
subs.retain(|sub| !sub.sender.is_closed());
match
// Filter shapes, if present.
subs.iter()
match subs // Filter shapes, if present.
.iter()
.filter(|s| match shape {
Some(sh) => *sh == s.shape_type.shape,
None => true, // Filter session ids if present.
@ -410,9 +391,7 @@ impl Verifier {
.next()
{
None => Err(VerifierError::OrmSubscriptionNotFound),
Some(subscription) => {
Ok((subscription.sender.clone(), subscription))
}
Some(subscription) => Ok((subscription.sender.clone(), subscription)),
}
}
@ -658,7 +637,8 @@ impl Verifier {
return Ok(return_vals);
}
pub(crate) async fn orm_update(
/// Generate and send JSON patches from GraphQuadsPatch (quad inserts and removes) to JS-land.
pub(crate) async fn orm_backend_update(
&mut self,
session_id: u64,
repo_id: RepoId,
@ -666,26 +646,267 @@ impl Verifier {
patch: GraphQuadsPatch,
) {
let overlaylink: OverlayLink = overlay_id.into();
// We need to apply the patches to all subscriptions we have. We can use process_changes_for_*
// That updates the tracked subjects, validates them, and returns a set of changes structured
// by the respective schema.
let triple_inserts: Vec<Triple> = patch
.inserts
.iter()
.map(|quad| {
Triple::new(
quad.subject.clone(),
quad.predicate.clone(),
quad.object.clone(),
)
})
.collect();
let triple_removes: Vec<Triple> = patch
.removes
.iter()
.map(|quad| {
Triple::new(
quad.subject.clone(),
quad.predicate.clone(),
quad.object.clone(),
)
})
.collect();
// let mut updates = Vec::new();
let mut scopes = vec![];
for (scope, subs) in self.orm_subscriptions.iter_mut() {
// Remove old subscriptions
subs.retain(|sub| !sub.sender.is_closed());
if scope.target == NuriTargetV0::UserSite
|| scope.overlay.as_ref().map_or(false, |ol| overlaylink == *ol)
|| scope
.overlay
.as_ref()
.map_or(false, |ol| overlaylink == *ol)
|| scope.target == NuriTargetV0::Repo(repo_id)
{
for sub in subs {
if sub.session_id != session_id { // this is incorrect. we are excluding all the subscriptions from the originating session,
// while we should only exclude the one with exact same shape_type. but we don't have access to that here
continue;
}
// TODO: implement this, generate orm_diff using the patch and the sub.shape_type
let orm_diff: OrmDiff = vec![];
// prepare to apply updates to tracked subjects and record the changes.
let shapes = subs
.iter()
.map(|sub| {
sub.shape_type
.schema
.get(&sub.shape_type.shape)
.unwrap()
.clone()
})
.collect::<Vec<_>>();
scopes.push((scope.clone(), shapes));
}
for (scope, shapes) in scopes {
let mut orm_changes: OrmChanges = HashMap::new();
// actually applying updates to tracked subjects and record the changes.
for shape_arc in shapes {
let _ = self.process_changes_for_shape_and_session(
&scope,
shape_arc,
session_id,
&triple_inserts,
&triple_removes,
&mut orm_changes,
false,
);
}
_ = sub.sender.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec()))).await;
let subs = self.orm_subscriptions.get(&scope).unwrap();
for sub in subs.iter() {
// TODO: This if-condition is wrong (intended to not re-apply changes coming from the same subscription).
if sub.session_id != session_id {
// Create diff from changes & subscription.
let mut patches: OrmDiff = vec![];
let mut path: Vec<String> = vec![];
fn create_patches_for_changed_subj(
orm_changes: &OrmChanges,
patches: &mut OrmDiff,
shape_iri: &String,
subject_iri: &String,
sub: &OrmSubscription,
path: &mut Vec<String>,
) {
let change = orm_changes
.get(shape_iri)
.unwrap()
.get(subject_iri)
.unwrap();
let subject_shape = sub.shape_type.schema.get(shape_iri).unwrap();
// Iterate over every predicate change and create patches
for (pred_iri, pred_change) in change.predicates.iter() {
let pred_shape = subject_shape
.predicates
.iter()
.find(|p| p.iri == *pred_iri)
.unwrap();
let is_multi =
pred_shape.maxCardinality > 1 || pred_shape.maxCardinality == -1;
let is_object =
pred_shape.dataTypes.iter().any(|dt| !dt.shape.is_none());
let pred_name = pred_shape.readablePredicate.clone();
path.push(pred_name);
let path_str = path.join("/");
// Depending on the predicate type, add the respective diff operation.
// Single primitive value
if !is_multi && !is_object {
if pred_change.values_added.len() > 0 {
patches.push(OrmDiffOp {
op: OrmDiffOpType::add,
valType: None,
path: path_str.clone(),
value: Some(json!(pred_change.values_added[0])),
});
}
if pred_change.values_removed.len() > 0 {
patches.push(OrmDiffOp {
op: OrmDiffOpType::remove,
valType: None,
path: path_str,
value: Some(json!(pred_change.values_added[0])),
});
}
} else if is_multi && !is_object {
// Set of primitive values
if pred_change.values_added.len() > 0 {
patches.push(OrmDiffOp {
op: OrmDiffOpType::add,
valType: Some(OrmDiffType::set),
path: path_str.clone(),
value: Some(json!(pred_change.values_added)),
});
}
if pred_change.values_removed.len() > 0 {
patches.push(OrmDiffOp {
op: OrmDiffOpType::remove,
valType: Some(OrmDiffType::set),
path: path_str,
value: Some(json!(pred_change.values_removed)),
});
}
} else if !is_multi && is_object {
// Single object.
if pred_change.values_added.len() > 0 {
// Object was added. That means, we need to add a basic object with no value,
// Then add further predicates to it in a recursive call.
patches.push(OrmDiffOp {
op: OrmDiffOpType::add,
valType: Some(OrmDiffType::object),
path: path_str.clone(),
value: None,
});
// Apply changes for nested object.
create_patches_for_changed_subj(
orm_changes,
patches,
&pred_shape.dataTypes[0].shape.as_ref().unwrap(), // TODO: We need to get to the information which of the object types was validated successfully.
match &pred_change.values_added[0] {
BasicType::Str(str) => &str,
_ => panic!("Nested object should be a string."),
},
sub,
path,
);
} else {
// Object is removed.
patches.push(OrmDiffOp {
op: OrmDiffOpType::remove,
valType: Some(OrmDiffType::object),
path: path_str,
value: None,
});
}
} else if is_multi && is_object {
// Add every object added.
for object_iri_added in pred_change.values_added.iter() {
// TODO: As with single object, just that we add the IRI to the path.
// We also need to check if the object existed before.
}
let tracked_predicate =
pred_change.tracked_predicate.read().unwrap();
// Delete every object removed.
if tracked_predicate.tracked_children.len() == 0 {
// Or the whole thing if no children remain
patches.push(OrmDiffOp {
op: OrmDiffOpType::remove,
valType: Some(OrmDiffType::object),
path: path_str,
value: None,
});
} else {
for object_iri_removed in pred_change.values_removed.iter() {
patches.push(OrmDiffOp {
op: OrmDiffOpType::remove,
valType: Some(OrmDiffType::object),
path: format!(
"{}/{}",
path_str,
match object_iri_removed {
BasicType::Str(iri) => iri,
_ => panic!("Object IRI must be string"),
}
),
value: None,
});
}
}
}
// Is the tracked subject valid and was it before?
// Just regular patch for each predicate.
// If the predicate is a nested object...
// We need to recurse. Do the same as here but append to the path.
// Is the tracked subject invalid and was valid before?
// Delete of root
// Did the subject become valid but was invalid before?
// We need to compose a orm object (based on the data we luckily have already).
// Was the subject invalid and is still invalid?
// Nothing to do.
// Remove to this predicate name from the path again.
path.pop();
}
}
// For each tracked subject that has the subscription's shape, call fn above
for subject_iri in sub.tracked_subjects.iter() { // TODO
}
//
let orm_diff: OrmDiff = vec![];
let _ = sub
.sender
.clone()
.send(AppResponse::V0(AppResponseV0::OrmUpdate(orm_diff.to_vec())))
.await;
}
}
}
}
/// After creating new objects (without an id) in JS-land,
/// we send the generated id for those back.
/// If something went wrong (revert_inserts / revert_removes not empty),
/// we send a JSON patch back to revert the made changes.
pub(crate) async fn orm_update_self(
&mut self,
scope: &NuriV0,
@ -695,7 +916,6 @@ impl Verifier {
revert_inserts: Vec<Quad>,
revert_removes: Vec<Quad>,
) -> Result<(), VerifierError> {
let (mut sender, orm_subscription) =
self.get_first_orm_subscription_sender_for(scope, Some(&shape_iri), Some(&session_id))?;
@ -703,7 +923,11 @@ impl Verifier {
// use orm_subscription if needed
// note(niko): I think skolemnized blank nodes can still be many, in case of multi-level nested sub-objects.
let orm_bnids = vec![];
let _ = sender.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds(orm_bnids))).await;
let _ = sender
.send(AppResponse::V0(AppResponseV0::OrmUpdateBlankNodeIds(
orm_bnids,
)))
.await;
// TODO (later) revert the inserts and removes
// let orm_diff = vec![];
@ -712,6 +936,7 @@ impl Verifier {
Ok(())
}
/// Handles updates coming from JS-land (JSON patches).
pub(crate) async fn orm_frontend_update(
&mut self,
session_id: u64,
@ -727,7 +952,6 @@ impl Verifier {
diff
);
// find OrmSubscription
let (doc_nuri, sparql_update) = {
let orm_subscription =
self.get_first_orm_subscription_for(scope, Some(&shape_iri), Some(&session_id));
@ -763,7 +987,8 @@ impl Verifier {
revert_inserts,
revert_removes,
)
.await.map_err(|e|e.to_string())?;
.await
.map_err(|e| e.to_string())?;
}
Ok(())
}
@ -808,7 +1033,9 @@ impl Verifier {
let orm_objects = self.create_orm_object_for_shape(nuri, session_id, &shape_type)?;
// log_debug!("create_orm_object_for_shape return {:?}", orm_objects);
let _ = tx.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects))).await;
let _ = tx
.send(AppResponse::V0(AppResponseV0::OrmInitial(orm_objects)))
.await;
let close = Box::new(move || {
log_debug!("closing ORM subscription");

@ -23,8 +23,11 @@ pub struct OrmTrackedSubject {
/// If this is a nested subject, this records the parents
/// and if they are currently tracking this subject.
pub parents: HashMap<String, Arc<RwLock<OrmTrackedSubject>>>,
/// Validity. When untracked, triple updates are not processed here.
/// Validity. When untracked, triple updates are not processed for this tracked subject.
pub valid: OrmTrackedSubjectValidity,
/// Previous validity. Used for validation and creating JSON Patch diffs from changes.
pub prev_valid: OrmTrackedSubjectValidity,
/// Subject IRI
pub subject_iri: String,
/// The shape for which the predicates are tracked.
pub shape: Arc<OrmSchemaShape>,

@ -22,7 +22,6 @@ impl Verifier {
s_change: &OrmTrackedSubjectChange,
shape: &OrmSchemaShape,
orm_subscription: &mut OrmSubscription,
previous_validity: OrmTrackedSubjectValidity,
) -> Vec<(SubjectIri, ShapeIri, NeedsFetchBool)> {
let tracked_subjects = &mut orm_subscription.tracked_subjects;
@ -33,6 +32,9 @@ impl Verifier {
return vec![];
};
let mut tracked_subject = tracked_subject.write().unwrap();
let previous_validity = tracked_subject.prev_valid.clone();
tracked_subject.prev_valid = tracked_subject.valid.clone();
// Keep track of objects that need to be validated against a shape to fetch and validate.
let mut need_evaluation: Vec<(String, String, bool)> = vec![];

@ -80,7 +80,12 @@ impl UserStorage for RocksDbUserStorage {
RepoStorage::update_signer_cap(signer_cap, &self.user_storage)
}
fn update_inbox_cap(&self, repo_id: &RepoId, overlay: &OverlayId, priv_key: &PrivKey) -> Result<(), StorageError> {
fn update_inbox_cap(
&self,
repo_id: &RepoId,
overlay: &OverlayId,
priv_key: &PrivKey,
) -> Result<(), StorageError> {
RepoStorage::update_inbox_cap(repo_id, overlay, priv_key, &self.user_storage)
}

@ -125,8 +125,11 @@ impl SiteV0 {
// Creating the Inbox commit body about public store.
let public_store_inbox_commit_body =
CommitBody::V0(CommitBodyV0::AddInboxCap(
AddInboxCap::new_v0(public_repo.id, public_repo.store.outer_overlay(), public_repo.inbox.to_owned().unwrap())));
CommitBody::V0(CommitBodyV0::AddInboxCap(AddInboxCap::new_v0(
public_repo.id,
public_repo.store.outer_overlay(),
public_repo.inbox.to_owned().unwrap(),
)));
let protected_repo = verifier
.new_store_default(
@ -143,8 +146,11 @@ impl SiteV0 {
// Creating the Inbox commit body about protected store.
let protected_store_inbox_commit_body =
CommitBody::V0(CommitBodyV0::AddInboxCap(
AddInboxCap::new_v0(protected_repo.id, protected_repo.store.outer_overlay(),protected_repo.inbox.to_owned().unwrap())));
CommitBody::V0(CommitBodyV0::AddInboxCap(AddInboxCap::new_v0(
protected_repo.id,
protected_repo.store.outer_overlay(),
protected_repo.inbox.to_owned().unwrap(),
)));
let private_repo = verifier
.new_store_default(

@ -59,8 +59,24 @@ impl GraphTransaction {
}
pub(crate) fn as_quads_patch(&self, graph_nuri: String) -> GraphQuadsPatch {
GraphQuadsPatch {
inserts: self.inserts.iter().map(|triple| triple.clone().in_graph(NamedNode::new(graph_nuri.clone()).unwrap())).collect(),
removes: self.removes.iter().map(|triple| triple.clone().in_graph(NamedNode::new(graph_nuri.clone()).unwrap())).collect(),
inserts: self
.inserts
.iter()
.map(|triple| {
triple
.clone()
.in_graph(NamedNode::new(graph_nuri.clone()).unwrap())
})
.collect(),
removes: self
.removes
.iter()
.map(|triple| {
triple
.clone()
.in_graph(NamedNode::new(graph_nuri.clone()).unwrap())
})
.collect(),
}
}
pub(crate) fn tokenize_with_commit_id(&mut self, commit_id: ObjectId, repo_id: &RepoId) {

@ -42,7 +42,12 @@ pub trait UserStorage: Send + Sync {
fn update_signer_cap(&self, signer_cap: &SignerCap) -> Result<(), StorageError>;
fn update_inbox_cap(&self, repo_id: &RepoId, overlay: &OverlayId, priv_key: &PrivKey) -> Result<(), StorageError>;
fn update_inbox_cap(
&self,
repo_id: &RepoId,
overlay: &OverlayId,
priv_key: &PrivKey,
) -> Result<(), StorageError>;
fn update_certificate(
&self,
@ -196,7 +201,12 @@ impl UserStorage for InMemoryUserStorage {
Ok(lock.remove(repo_id).ok_or(StorageError::NotFound)?)
}
fn update_inbox_cap(&self, repo_id: &RepoId, overlay: &OverlayId, priv_key: &PrivKey) -> Result<(), StorageError> {
fn update_inbox_cap(
&self,
repo_id: &RepoId,
overlay: &OverlayId,
priv_key: &PrivKey,
) -> Result<(), StorageError> {
let mut lock = self.repo_inbox_cap.write().unwrap();
lock.insert(*repo_id, priv_key.clone());
Ok(())

@ -66,7 +66,7 @@ pub struct AccessRequestV0 {
pub optional: bool,
/// request depends on another request (only if optional)
pub depends_on: Option<String>
pub depends_on: Option<String>,
}
impl AccessRequestV0 {
@ -98,7 +98,7 @@ pub struct AppComponentV0 {
/// Name of the component, can be an official component of the for n:g:z, or custom ones n:xxx:z:yyy or o:xxx
pub name: String,
pub component_type: AppComponentType
pub component_type: AppComponentType,
}
/// Primary Class Install Version 0
@ -107,7 +107,7 @@ pub struct PrimaryClassInstallV0 {
/// Primary Class name, can be an official name or a custom name of the form app:n... or app:o:...
pub primary_class: String,
pub components: Vec<AppComponentV0>
pub components: Vec<AppComponentV0>,
}
/// App Manifest Version 0
@ -157,8 +157,7 @@ pub enum AppManifest {
impl AppManifest {
pub fn new_for_origin_all_access_v0(origin: String) -> Self {
AppManifest::V0(
AppManifestV0 {
AppManifest::V0(AppManifestV0 {
nuri: None,
origin: Some(origin),
singleton: true,
@ -169,13 +168,11 @@ impl AppManifest {
title: None,
description: None,
icon: vec![],
image: vec![]
}
)
image: vec![],
})
}
pub fn new_v0(origin: String, singleton: bool, access_requests: Vec<AccessRequestV0>) -> Self {
AppManifest::V0(
AppManifestV0 {
AppManifest::V0(AppManifestV0 {
nuri: None,
origin: Some(origin),
singleton,
@ -186,9 +183,8 @@ impl AppManifest {
title: None,
description: None,
icon: vec![],
image: vec![]
}
)
image: vec![],
})
}
pub fn to_url_param(&self) -> String {
let ser = serde_bare::to_vec(self).unwrap();
@ -224,9 +220,5 @@ pub struct AccessGrantV0 {
pub grantee: UserId,
/// grant depends on another grant
pub depends_on: Option<String>
pub depends_on: Option<String>,
}

@ -485,11 +485,17 @@ pub enum SensitiveWallet {
}
impl SensitiveWallet {
pub fn get_bootstrap_iframe_msgs(brokers: HashMap<String, Vec<BrokerInfoV0>>) -> Vec<BootstrapIframeMsg> {
brokers.values().flatten().filter_map(|broker_info| match broker_info {
pub fn get_bootstrap_iframe_msgs(
brokers: HashMap<String, Vec<BrokerInfoV0>>,
) -> Vec<BootstrapIframeMsg> {
brokers
.values()
.flatten()
.filter_map(|broker_info| match broker_info {
BrokerInfoV0::CoreV0(_) => None,
BrokerInfoV0::ServerV0(s) => Some(s.to_iframe_msg())
}).collect::<Vec<BootstrapIframeMsg>>()
BrokerInfoV0::ServerV0(s) => Some(s.to_iframe_msg()),
})
.collect::<Vec<BootstrapIframeMsg>>()
}
pub fn privkey(&self) -> PrivKey {
match self {
@ -1478,4 +1484,3 @@ pub struct ShuffledPazzle {
pub category_indices: Vec<u8>,
pub emoji_indices: Vec<Vec<u8>>,
}

@ -30,11 +30,11 @@ use ng_repo::types::*;
use ng_repo::utils::timestamp_after;
use ng_net::actors::admin::add_invitation::*;
use ng_net::bsps::BSP_DETAILS;
use ng_net::broker::BROKER;
use ng_net::bsps::BSP_DETAILS;
use ng_net::types::{
AdminResponseContentV0, BindAddress, CreateAccountBSP, Invitation, InvitationCode,
APP_ACCOUNT_REGISTERED_SUFFIX, NG_APP_URL, NG_NET_URL
APP_ACCOUNT_REGISTERED_SUFFIX, NG_APP_URL, NG_NET_URL,
};
use ng_client_ws::remote_ws::ConnectionWebSocket;
@ -51,9 +51,7 @@ struct AuthStatic;
#[folder = "../net-bootstrap/dist"]
struct BootstrapStatic;
struct Server {
}
struct Server {}
// impl Server {
// pub async fn register(self: Arc<Self>, ca: String) -> Result<Response, Infallible> {
@ -98,9 +96,7 @@ async fn main() -> anyhow::Result<()> {
}
env_logger::init();
let server = Arc::new(Server {
});
let server = Arc::new(Server {});
// GET /api/v1/register/ca with the same ?ca= query param => 201 CREATED
// let register_api = warp::get()
@ -125,14 +121,25 @@ async fn main() -> anyhow::Result<()> {
.map(|reply, p: HashMap<String, String>| match p.get("o") {
Some(obj) => {
let decoded = obj.trim();
if BSP_DETAILS.get(decoded).is_none() && decoded != "http://localhost:14400" && decoded != "http://localhost:1421" {
if BSP_DETAILS.get(decoded).is_none()
&& decoded != "http://localhost:14400"
&& decoded != "http://localhost:1421"
{
// rejected (BSP not listed)
warp::http::StatusCode::UNAUTHORIZED.into_response()
} else {
let reply = warp::reply::with_header(reply, "Content-Security-Policy",
HeaderValue::from_str(&format!("frame-ancestors 'self' {decoded};")).unwrap());
warp::reply::with_header(reply, "X-Frame-Options",
HeaderValue::from_str(&format!("ALLOW-FROM {decoded}")).unwrap()).into_response()
let reply = warp::reply::with_header(
reply,
"Content-Security-Policy",
HeaderValue::from_str(&format!("frame-ancestors 'self' {decoded};"))
.unwrap(),
);
warp::reply::with_header(
reply,
"X-Frame-Options",
HeaderValue::from_str(&format!("ALLOW-FROM {decoded}")).unwrap(),
)
.into_response()
}
}
None => warp::http::StatusCode::BAD_REQUEST.into_response(),
@ -146,13 +153,24 @@ async fn main() -> anyhow::Result<()> {
.map(|reply, p: HashMap<String, String>| match p.get("o") {
Some(obj) => {
let decoded = obj.trim();
if decoded.eq("*") || (!decoded.starts_with("http://") && !decoded.starts_with("https://")) || decoded.len() < 11 {
if decoded.eq("*")
|| (!decoded.starts_with("http://") && !decoded.starts_with("https://"))
|| decoded.len() < 11
{
warp::http::StatusCode::BAD_REQUEST.into_response()
} else {
let reply = warp::reply::with_header(reply, "Content-Security-Policy",
HeaderValue::from_str(&format!("frame-ancestors 'self' {decoded};")).unwrap());
warp::reply::with_header(reply, "X-Frame-Options",
HeaderValue::from_str(&format!("ALLOW-FROM {decoded}")).unwrap()).into_response()
let reply = warp::reply::with_header(
reply,
"Content-Security-Policy",
HeaderValue::from_str(&format!("frame-ancestors 'self' {decoded};"))
.unwrap(),
);
warp::reply::with_header(
reply,
"X-Frame-Options",
HeaderValue::from_str(&format!("ALLOW-FROM {decoded}")).unwrap(),
)
.into_response()
}
}
None => warp::http::StatusCode::BAD_REQUEST.into_response(),
@ -187,7 +205,12 @@ async fn main() -> anyhow::Result<()> {
// TODO when there will be an API again, we will call it from any BSPs.
// we should add the list of all BSPs origin's here
log::info!("Starting production server on http://localhost:3033");
warp::serve(static_files.or(static_files_auth.or(static_files_bootstrap)).with(cors).with(incoming_log))
warp::serve(
static_files
.or(static_files_auth.or(static_files_bootstrap))
.with(cors)
.with(incoming_log),
)
.run(([127, 0, 0, 1], 3033))
.await;
}
@ -196,7 +219,12 @@ async fn main() -> anyhow::Result<()> {
log_debug!("CORS: any origin");
cors = cors.allow_any_origin();
log::info!("Starting server on http://localhost:3033");
warp::serve(static_files.or(static_files_auth.or(static_files_bootstrap)).with(cors).with(incoming_log))
warp::serve(
static_files
.or(static_files_auth.or(static_files_bootstrap))
.with(cors)
.with(incoming_log),
)
// TODO: Change this to local network ip?
.run(([127, 0, 0, 1], 3033))
.await;

Loading…
Cancel
Save