improve sync of user nad forwarder branches when receving inbox msg

master
Niko PLP 2 weeks ago
parent acdfd37099
commit 0abbb866d5
  1. 14
      ng-net/src/types.rs
  2. 55
      ng-verifier/src/inbox_processor.rs
  3. 16
      ng-verifier/src/request_processor.rs

@ -3686,10 +3686,9 @@ impl InboxPost {
from: Option<(OverlayId,PrivKey)>, from: Option<(OverlayId,PrivKey)>,
query_id: RepoId, query_id: RepoId,
forwarder_id: RepoId, forwarder_id: RepoId,
forwarder_readcap: ReadCap,
content: SocialQueryResponseContent content: SocialQueryResponseContent
) -> Result<Self,NgError> { ) -> Result<Self,NgError> {
let content = InboxMsgContent::SocialQuery(SocialQuery::Response(SocialQueryResponse { query_id, forwarder_id, forwarder_readcap, content })); let content = InboxMsgContent::SocialQuery(SocialQuery::Response(SocialQueryResponse { query_id, forwarder_id, content }));
Self::new(to_overlay, to_inbox, from, &content, vec![], None) Self::new(to_overlay, to_inbox, from, &content, vec![], None)
} }
@ -3705,8 +3704,7 @@ impl InboxPost {
let from = Some((msg.to_overlay, inbox_privkey)); let from = Some((msg.to_overlay, inbox_privkey));
let query_id = request.query_id; let query_id = request.query_id;
let forwarder_id = request.forwarder_id; let forwarder_id = request.forwarder_id;
let forwarder_readcap = request.forwarder_readcap.clone(); let content = InboxMsgContent::SocialQuery(SocialQuery::Response(SocialQueryResponse { query_id, forwarder_id, content }));
let content = InboxMsgContent::SocialQuery(SocialQuery::Response(SocialQueryResponse { query_id, forwarder_id, forwarder_readcap, content }));
Self::new(to_overlay, to_inbox, from, &content, vec![], None) Self::new(to_overlay, to_inbox, from, &content, vec![], None)
} }
@ -3716,7 +3714,6 @@ impl InboxPost {
from_profile_store_repo: StoreRepo, from_profile_store_repo: StoreRepo,
from_inbox: PrivKey, from_inbox: PrivKey,
forwarder_id: RepoId, forwarder_id: RepoId,
forwarder_readcap: ReadCap,
to_profile_nuri: String, to_profile_nuri: String,
to_inbox_nuri: String, to_inbox_nuri: String,
to_broker: Option<Locator>, to_broker: Option<Locator>,
@ -3748,7 +3745,6 @@ impl InboxPost {
let content = InboxMsgContent::SocialQuery(SocialQuery::Request(SocialQueryRequest{ let content = InboxMsgContent::SocialQuery(SocialQuery::Request(SocialQueryRequest{
query_id, query_id,
forwarder_id, forwarder_id,
forwarder_readcap,
from_profile_store_repo, from_profile_store_repo,
degree, degree,
definition_commit_body_ref, definition_commit_body_ref,
@ -4182,9 +4178,6 @@ pub struct SocialQueryRequest {
/// Forwarder ID /// Forwarder ID
pub forwarder_id: RepoId, pub forwarder_id: RepoId,
/// Forwarder read cap
pub forwarder_readcap: ReadCap,
/// Profile ID (must match the from_overlay) /// Profile ID (must match the from_overlay)
pub from_profile_store_repo: StoreRepo, pub from_profile_store_repo: StoreRepo,
@ -4218,9 +4211,6 @@ pub struct SocialQueryResponse {
/// Forwarder ID /// Forwarder ID
pub forwarder_id: RepoId, pub forwarder_id: RepoId,
/// Forwarder read cap
pub forwarder_readcap: ReadCap,
/// Response content /// Response content
pub content: SocialQueryResponseContent, pub content: SocialQueryResponseContent,
} }

@ -46,9 +46,9 @@ impl Verifier {
from_forwarder_nuri_string: &String, from_forwarder_nuri_string: &String,
from_profile_nuri_string: &String, from_profile_nuri_string: &String,
from_inbox_nuri_string: &String, from_inbox_nuri_string: &String,
) -> Result<(String, NuriV0, ReadCap), VerifierError> { ) -> Result<(String, NuriV0), VerifierError> {
// creating the ForwardedSocialQuery in the private store // creating the ForwardedSocialQuery in the private store
let (forwarder, readcap) = self.doc_create_with_store_repo( let forwarder = self.doc_create_with_store_repo(
"Graph".to_string(), "social:query:forwarded".to_string(), "Graph".to_string(), "social:query:forwarded".to_string(),
"store".to_string(), None // meaning in private store "store".to_string(), None // meaning in private store
).await?; ).await?;
@ -70,7 +70,7 @@ impl Verifier {
if let Err(e) = ret { if let Err(e) = ret {
return Err(VerifierError::SparqlError(e)); return Err(VerifierError::SparqlError(e));
} }
Ok((forwarder_nuri_string,forwarder_nuri, readcap)) Ok((forwarder_nuri_string,forwarder_nuri))
} }
pub(crate) async fn mark_social_query_forwarder(&mut self, forwarder_nuri_string: &String, forwarder_nuri: &NuriV0, predicate: String) -> Result<(), VerifierError> { pub(crate) async fn mark_social_query_forwarder(&mut self, forwarder_nuri_string: &String, forwarder_nuri: &NuriV0, predicate: String) -> Result<(), VerifierError> {
@ -135,7 +135,6 @@ impl Verifier {
to_inbox_nuri: &String, to_inbox_nuri: &String,
forwarder_nuri: &NuriV0, forwarder_nuri: &NuriV0,
forwarder_id: &RepoId, forwarder_id: &RepoId,
forwarder_readcap: &ReadCap,
from_profiles: &( from_profiles: &(
(StoreRepo, PrivKey), // public (StoreRepo, PrivKey), // public
(StoreRepo, PrivKey) // protected (StoreRepo, PrivKey) // protected
@ -171,7 +170,6 @@ impl Verifier {
from_profile.0, from_profile.0,
from_profile.1.clone(), from_profile.1.clone(),
*forwarder_id, *forwarder_id,
forwarder_readcap.clone(),
to_profile_nuri.clone(), to_profile_nuri.clone(),
to_inbox_nuri.clone(), to_inbox_nuri.clone(),
None, None,
@ -223,7 +221,7 @@ impl Verifier {
} }
// otherwise, create the forwarder // otherwise, create the forwarder
let (forwarder_nuri_string, forwarder_nuri, readcap) = self.create_social_query_forwarder( let (forwarder_nuri_string, forwarder_nuri) = self.create_social_query_forwarder(
&social_query_doc_nuri_string, &social_query_doc_nuri_string,
&NuriV0::repo_id(&req.forwarder_id), &NuriV0::repo_id(&req.forwarder_id),
&NuriV0::from_store_repo_string(&req.from_profile_store_repo), &NuriV0::from_store_repo_string(&req.from_profile_store_repo),
@ -348,7 +346,6 @@ impl Verifier {
to_inbox_nuri, to_inbox_nuri,
&forwarder_nuri, &forwarder_nuri,
&forwarder_id, &forwarder_id,
&readcap,
&from_profiles, &from_profiles,
&req.query_id, &req.query_id,
&req.definition_commit_body_ref, &req.definition_commit_body_ref,
@ -388,26 +385,34 @@ impl Verifier {
{ {
let broker = BROKER.read().await; let broker = BROKER.read().await;
let user = Some(self.user_id().clone()); let user = Some(self.user_id().clone());
let remote = (&self.connected_broker).into(); //let remote = (&self.connected_broker).into();
let private_store = self let (user_branch_id, private_store_id) = {
let private_store = self
.repos .repos
.get(self.private_store_id()) .get(self.private_store_id())
.ok_or(NgError::StoreNotFound)?; .ok_or(NgError::StoreNotFound)?;
if self.repos.get(&response.forwarder_id).is_none() {
(private_store.user_branch().unwrap().id, private_store.id)
// we need to load the forwarder };
self.load_repo_from_read_cap(
&response.forwarder_readcap, // if self.repos.get(&response.forwarder_id).is_none() {
&broker,
&user, // // we need to load the forwarder
&remote, // self.load_repo_from_read_cap(
Arc::clone(&private_store.store), // &response.forwarder_readcap,
true, // &broker,
) // &user,
.await?; // &remote,
self.open_for_target(&forwarder_nuri.target, false).await?; // Arc::clone(&private_store.store),
} // true,
// )
// .await?;
// self.open_for_target(&forwarder_nuri.target, false).await?;
// }
self.open_branch_(&private_store_id, &user_branch_id,
false, &broker, &user, &self.connected_broker.clone(), true ).await?;
let main_branch_id = { let main_branch_id = {
self.repos.get(&response.forwarder_id).unwrap().main_branch().unwrap().id self.repos.get(&response.forwarder_id).unwrap().main_branch().unwrap().id
@ -563,7 +568,6 @@ impl Verifier {
Some(from), Some(from),
response.query_id, response.query_id,
from_forwarder, from_forwarder,
response.forwarder_readcap,
SocialQueryResponseContent::EndOfReplies SocialQueryResponseContent::EndOfReplies
)?; )?;
self.post_to_inbox(post).await?; self.post_to_inbox(post).await?;
@ -607,7 +611,6 @@ impl Verifier {
Some(from), Some(from),
response.query_id, response.query_id,
from_forwarder, from_forwarder,
response.forwarder_readcap,
SocialQueryResponseContent::Graph(graph) SocialQueryResponseContent::Graph(graph)
)?; )?;
self.post_to_inbox(post).await?; self.post_to_inbox(post).await?;
@ -643,7 +646,7 @@ impl Verifier {
_ => {} _ => {}
} }
let (contact, _) = self.doc_create_with_store_repo( let contact = self.doc_create_with_store_repo(
"Graph".to_string(), "social:contact".to_string(), "Graph".to_string(), "social:contact".to_string(),
"store".to_string(), None // meaning in private store "store".to_string(), None // meaning in private store
).await?; ).await?;

@ -618,7 +618,7 @@ impl Verifier {
class_name: String, class_name: String,
destination: String, destination: String,
store_repo: Option<StoreRepo>, store_repo: Option<StoreRepo>,
) -> Result<(String, ReadCap), NgError> { ) -> Result<String, NgError> {
let class = BranchCrdt::from(crdt, class_name)?; let class = BranchCrdt::from(crdt, class_name)?;
@ -656,7 +656,7 @@ impl Verifier {
&mut self, &mut self,
nuri: NuriV0, nuri: NuriV0,
doc_create: DocCreate doc_create: DocCreate
) -> Result<(String, ReadCap), NgError> { ) -> Result<String, NgError> {
//TODO: deal with doc_create.destination //TODO: deal with doc_create.destination
let user_id = self.user_id().clone(); let user_id = self.user_id().clone();
@ -672,10 +672,9 @@ impl Verifier {
) )
.await?; .await?;
let (read_cap, header_branch_id) = { let header_branch_id = {
let repo = self.get_repo(&repo_id, &store)?; let repo = self.get_repo(&repo_id, &store)?;
(repo.read_cap.to_owned().unwrap(), repo.header_branch().ok_or(NgError::BranchNotFound)?.id
repo.header_branch().ok_or(NgError::BranchNotFound)?.id)
}; };
// adding an AddRepo commit to the Store branch of store. // adding an AddRepo commit to the Store branch of store.
@ -711,7 +710,7 @@ impl Verifier {
if let Err(e) = ret { if let Err(e) = ret {
return Err(NgError::SparqlError(e.to_string())); return Err(NgError::SparqlError(e.to_string()));
} }
Ok((nuri_result,read_cap)) Ok(nuri_result)
} }
fn get_profile_for_inbox_post(&self, public: bool) -> Result<(StoreRepo, PrivKey),NgError> { fn get_profile_for_inbox_post(&self, public: bool) -> Result<(StoreRepo, PrivKey),NgError> {
@ -927,7 +926,7 @@ impl Verifier {
} }
// creating the ForwardedSocialQuery in the private store // creating the ForwardedSocialQuery in the private store
let (forwarder, forwarder_readcap) = self.doc_create_with_store_repo( let forwarder = self.doc_create_with_store_repo(
"Graph".to_string(), "social:query:forwarded".to_string(), "Graph".to_string(), "social:query:forwarded".to_string(),
"store".to_string(), None // meaning in private store "store".to_string(), None // meaning in private store
).await?; ).await?;
@ -965,7 +964,6 @@ impl Verifier {
&to_inbox_nuri, &to_inbox_nuri,
&forwarder_nuri, &forwarder_nuri,
&forwarder_id, &forwarder_id,
&forwarder_readcap,
&from_profiles, &from_profiles,
query_id, query_id,
&definition_commit_body_ref, &definition_commit_body_ref,
@ -1061,7 +1059,7 @@ impl Verifier {
match self.doc_create(nuri, doc_create).await { match self.doc_create(nuri, doc_create).await {
Err(NgError::SparqlError(e)) => Ok(AppResponse::error(e)), Err(NgError::SparqlError(e)) => Ok(AppResponse::error(e)),
Err(e) => Err(e), Err(e) => Err(e),
Ok((nuri_result,_)) => Ok(AppResponse::V0(AppResponseV0::Nuri(nuri_result))) Ok(nuri_result) => Ok(AppResponse::V0(AppResponseV0::Nuri(nuri_result)))
} }
} else { } else {
Err(NgError::InvalidPayload) Err(NgError::InvalidPayload)

Loading…
Cancel
Save