branch_history API

pull/19/head
Niko PLP 6 months ago
parent 446e5cafa8
commit 4dbf3aa648
  1. 88
      ng-net/src/app_protocol.rs
  2. 2
      ng-repo/src/branch.rs
  3. 55
      ng-repo/src/commit.rs
  4. 1
      ng-repo/src/object.rs
  5. 98
      ng-repo/src/repo.rs
  6. 96
      ng-repo/src/types.rs
  7. 3
      ng-sdk-js/app-node/index.js
  8. 27
      ng-sdk-js/src/lib.rs
  9. 16
      ng-verifier/src/commits/mod.rs
  10. 36
      ng-verifier/src/request_processor.rs
  11. 25
      ng-verifier/src/verifier.rs

@ -9,7 +9,10 @@
//! App Protocol (between LocalBroker and Verifier) //! App Protocol (between LocalBroker and Verifier)
use std::collections::HashMap;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use ng_repo::repo::CommitInfo;
use ng_repo::utils::decode_overlayid; use ng_repo::utils::decode_overlayid;
use regex::Regex; use regex::Regex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -46,6 +49,7 @@ pub enum AppFetchContentV0 {
ReadQuery, // more to be detailed ReadQuery, // more to be detailed
WriteQuery, // more to be detailed WriteQuery, // more to be detailed
RdfDump, RdfDump,
History,
} }
impl AppFetchContentV0 { impl AppFetchContentV0 {
@ -134,6 +138,29 @@ impl NuriTargetV0 {
} }
} }
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CommitInfoJs {
pub past: Vec<String>,
pub key: String,
pub signature: Option<String>,
pub author: String,
pub final_consistency: bool,
pub commit_type: CommitType,
}
impl From<&CommitInfo> for CommitInfoJs {
fn from(info: &CommitInfo) -> Self {
CommitInfoJs {
past: info.past.iter().map(|objid| objid.to_string()).collect(),
key: info.key.to_string(),
signature: info.signature.as_ref().map(|s| NuriV0::object_ref(&s)),
author: info.author.clone(),
final_consistency: info.final_consistency,
commit_type: info.commit_type.clone(),
}
}
}
const DID_PREFIX: &str = "did:ng"; const DID_PREFIX: &str = "did:ng";
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -180,6 +207,10 @@ impl NuriV0 {
format!("{DID_PREFIX}:b:{branch_base64}") format!("{DID_PREFIX}:b:{branch_base64}")
} }
pub fn object_ref(obj_ref: &ObjectRef) -> String {
format!("{DID_PREFIX}{}", obj_ref.nuri())
}
pub fn token(token: &Digest) -> String { pub fn token(token: &Digest) -> String {
format!("{DID_PREFIX}:n:{token}") format!("{DID_PREFIX}:n:{token}")
} }
@ -361,6 +392,9 @@ impl AppRequestCommandV0 {
pub fn new_rdf_dump() -> Self { pub fn new_rdf_dump() -> Self {
AppRequestCommandV0::Fetch(AppFetchContentV0::RdfDump) AppRequestCommandV0::Fetch(AppFetchContentV0::RdfDump)
} }
pub fn new_history() -> Self {
AppRequestCommandV0::Fetch(AppFetchContentV0::History)
}
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
@ -603,20 +637,57 @@ pub struct GraphState {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AppState { pub struct AppState {
heads: Vec<ObjectId>, pub heads: Vec<ObjectId>,
graph: Option<GraphState>, // there is always a graph present in the branch. but it might not have been asked in the request pub graph: Option<GraphState>, // there is always a graph present in the branch. but it might not have been asked in the request
discrete: Option<DiscreteState>, pub discrete: Option<DiscreteState>,
pub files: Vec<FileName>,
} }
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AppHistory {
pub heads: Vec<ObjectId>,
pub history: HashMap<ObjectId, CommitInfo>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AppHistoryJs {
pub heads: Vec<String>,
pub history: HashMap<String, CommitInfoJs>,
}
impl AppHistory {
pub fn to_js(&self) -> AppHistoryJs {
AppHistoryJs {
heads: self.heads.iter().map(|h| h.to_string()).collect(),
history: HashMap::from_iter(
self.history
.iter()
.map(|(id, info)| (id.to_string(), info.into())),
),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum OtherPatch {
FileAdd(FileName),
FileRemove(ObjectId),
AsyncSignature((ObjectRef, Vec<ObjectId>)),
Other,
}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AppPatch { pub struct AppPatch {
heads: Vec<ObjectId>, pub commit_id: ObjectId,
graph: Option<GraphPatch>, pub commit_info: CommitInfo,
discrete: Option<DiscretePatch>, // or graph, or discrete, or both, or other.
pub graph: Option<GraphPatch>,
pub discrete: Option<DiscretePatch>,
pub other: Option<OtherPatch>,
} }
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct FileName { pub struct FileName {
pub heads: Vec<ObjectId>,
pub name: Option<String>, pub name: Option<String>,
pub reference: ObjectRef, pub reference: ObjectRef,
pub nuri: String, pub nuri: String,
@ -633,8 +704,9 @@ pub enum AppResponseV0 {
SessionStart(AppSessionStartResponse), SessionStart(AppSessionStartResponse),
State(AppState), State(AppState),
Patch(AppPatch), Patch(AppPatch),
History(AppHistory),
Text(String), Text(String),
File(FileName), //File(FileName),
FileUploading(u32), FileUploading(u32),
FileUploaded(ObjectRef), FileUploaded(ObjectRef),
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]

@ -178,7 +178,7 @@ impl Branch {
/// ///
/// and collect in `visited` the ObjectIds encountered on the way, stopping at any commit already belonging to `theirs` or the root of DAG. /// and collect in `visited` the ObjectIds encountered on the way, stopping at any commit already belonging to `theirs` or the root of DAG.
/// optionally collecting the missing objects/blocks that couldn't be found locally on the way, /// optionally collecting the missing objects/blocks that couldn't be found locally on the way,
/// and also optionally, collecting the commits of theirs found on the way /// and also optionally, collecting the commits of `theirs` found on the way
pub fn load_causal_past( pub fn load_causal_past(
cobj: &Object, cobj: &Object,
store: &Store, store: &Store,

@ -20,6 +20,7 @@ use crate::errors::*;
#[allow(unused_imports)] #[allow(unused_imports)]
use crate::log::*; use crate::log::*;
use crate::object::*; use crate::object::*;
use crate::repo::CommitInfo;
use crate::repo::Repo; use crate::repo::Repo;
use crate::store::Store; use crate::store::Store;
use crate::types::*; use crate::types::*;
@ -322,7 +323,7 @@ impl Commit {
_ => return Err(CommitLoadError::NotACommit), _ => return Err(CommitLoadError::NotACommit),
}; };
commit.set_id(id); commit.set_id(id);
commit.set_key(key.clone()); commit.set_key(key);
match commit.load_body(store) { match commit.load_body(store) {
Ok(_) => return Err(CommitLoadError::MissingBlocks(missing)), Ok(_) => return Err(CommitLoadError::MissingBlocks(missing)),
Err(CommitLoadError::MissingBlocks(mut missing_body)) => { Err(CommitLoadError::MissingBlocks(mut missing_body)) => {
@ -344,7 +345,7 @@ impl Commit {
_ => return Err(CommitLoadError::NotACommit), _ => return Err(CommitLoadError::NotACommit),
}; };
commit.set_id(id); commit.set_id(id);
commit.set_key(key.clone()); commit.set_key(key);
commit.set_header(obj.header().clone()); commit.set_header(obj.header().clone());
if with_body { if with_body {
@ -429,6 +430,34 @@ impl Commit {
} }
} }
/// Get author (a UserId)
pub fn author(&self) -> &Digest {
self.content().author()
}
pub fn final_consistency(&self) -> bool {
self.content().final_consistency()
}
pub fn get_type(&self) -> Option<CommitType> {
self.body().map(|b| b.get_type())
}
pub fn get_signature_reference(&self) -> Option<ObjectRef> {
self.body().map_or(None, |b| b.get_signature_reference())
}
pub fn as_info(&self, repo: &Repo) -> CommitInfo {
CommitInfo {
past: self.acks_ids(),
key: self.key().unwrap(),
signature: None,
author: repo.get_user_string(self.author()),
final_consistency: self.final_consistency(),
commit_type: self.get_type().unwrap(),
}
}
/// Get branch ID this commit is about /// Get branch ID this commit is about
pub fn branch(&self) -> &BranchId { pub fn branch(&self) -> &BranchId {
self.content().branch() self.content().branch()
@ -538,6 +567,28 @@ impl Commit {
res res
} }
/// Get acks (that have an ID in the header, without checking if there is a key for them in the header_keys)
/// if there is no header, returns an empty vec
pub fn acks_ids(&self) -> Vec<ObjectId> {
match self {
Commit::V0(c) => match &c.header {
Some(h) => h.acks(),
None => vec![],
},
}
}
/// Get deps (that have an ID in the header, without checking if there is a key for them in the header_keys)
/// if there is no header, returns an empty vec
pub fn deps_ids(&self) -> Vec<ObjectId> {
match self {
Commit::V0(c) => match &c.header {
Some(h) => h.deps(),
None => vec![],
},
}
}
/// Get files /// Get files
pub fn files(&self) -> Vec<ObjectRef> { pub fn files(&self) -> Vec<ObjectRef> {
let mut res: Vec<ObjectRef> = vec![]; let mut res: Vec<ObjectRef> = vec![];

@ -1004,6 +1004,7 @@ impl fmt::Display for ObjectContent {
("RandomAccessFileMeta", format!("{}", "")) ("RandomAccessFileMeta", format!("{}", ""))
} }
ObjectContentV0::RefreshCap(_c) => ("RefreshCap", format!("{}", "")), ObjectContentV0::RefreshCap(_c) => ("RefreshCap", format!("{}", "")),
ObjectContentV0::Snapshot(_c) => ("Snapshot", format!("size={}", _c.len())),
}, },
), ),
}; };

@ -14,6 +14,8 @@ use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::errors::*; use crate::errors::*;
#[allow(unused_imports)] #[allow(unused_imports)]
use crate::log::*; use crate::log::*;
@ -153,6 +155,16 @@ impl fmt::Display for Repo {
} }
} }
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CommitInfo {
pub past: Vec<ObjectId>,
pub key: ObjectKey,
pub signature: Option<ObjectRef>,
pub author: String,
pub final_consistency: bool,
pub commit_type: CommitType,
}
impl Repo { impl Repo {
#[cfg(any(test, feature = "testing"))] #[cfg(any(test, feature = "testing"))]
#[allow(deprecated)] #[allow(deprecated)]
@ -161,6 +173,92 @@ impl Repo {
Self::new_with_member(&pub_key, &pub_key, perms, store) Self::new_with_member(&pub_key, &pub_key, perms, store)
} }
pub(crate) fn get_user_string(&self, user_hash: &Digest) -> String {
self.members
.get(user_hash)
.map_or_else(|| format!("t:{user_hash}"), |info| format!("i:{}", info.id))
}
fn load_causal_past(
&self,
cobj: &Commit,
visited: &mut HashMap<ObjectId, CommitInfo>,
) -> Result<(), VerifierError> {
let id = cobj.id().unwrap();
if visited.get(&id).is_none() {
let commit_type = cobj.get_type().unwrap();
let acks = cobj.acks();
let (past, real_acks) = match commit_type {
CommitType::SyncSignature => {
assert_eq!(acks.len(), 1);
let dep = cobj.deps();
assert_eq!(dep.len(), 1);
let mut current_commit = dep[0].clone();
let sign_ref = cobj.get_signature_reference().unwrap();
let real_acks;
loop {
let o = Commit::load(current_commit.clone(), &self.store, true)?;
let deps = o.deps();
let commit_info = CommitInfo {
past: deps.iter().map(|r| r.id.clone()).collect(),
key: o.key().unwrap(),
signature: Some(sign_ref.clone()),
author: self.get_user_string(o.author()),
final_consistency: o.final_consistency(),
commit_type: o.get_type().unwrap(),
};
let id = o.id().unwrap();
visited.insert(id, commit_info);
if id == acks[0].id {
real_acks = o.acks();
break;
}
assert_eq!(deps.len(), 1);
current_commit = deps[0].clone();
}
(vec![dep[0].id], real_acks)
}
CommitType::AsyncSignature => {
let past: Vec<ObjectId> = acks.iter().map(|r| r.id.clone()).collect();
for p in past.iter() {
visited.get_mut(p).unwrap().signature =
Some(cobj.get_signature_reference().unwrap());
}
(past, acks)
}
_ => (acks.iter().map(|r| r.id.clone()).collect(), acks),
};
let commit_info = CommitInfo {
past,
key: cobj.key().unwrap(),
signature: None,
author: self.get_user_string(cobj.author()),
final_consistency: cobj.final_consistency(),
commit_type,
};
visited.insert(id, commit_info);
for past_ref in real_acks {
let o = Commit::load(past_ref, &self.store, true)?;
self.load_causal_past(&o, visited)?;
}
}
Ok(())
}
pub fn history_at_heads(
&self,
heads: &[ObjectRef],
) -> Result<HashMap<ObjectId, CommitInfo>, VerifierError> {
let mut res = HashMap::new();
for id in heads {
if let Ok(cobj) = Commit::load(id.clone(), &self.store, true) {
self.load_causal_past(&cobj, &mut res)?;
}
}
Ok(res)
}
pub fn update_branch_current_heads( pub fn update_branch_current_heads(
&mut self, &mut self,
branch: &BranchId, branch: &BranchId,

@ -1927,9 +1927,8 @@ pub struct SnapshotV0 {
// Branch heads the snapshot was made from, can be useful when shared outside and the commit_header_key is set to None. otherwise it is redundant to ACKS // Branch heads the snapshot was made from, can be useful when shared outside and the commit_header_key is set to None. otherwise it is redundant to ACKS
pub heads: Vec<ObjectId>, pub heads: Vec<ObjectId>,
/// Snapshot data structure /// Reference to Object containing Snapshot data structure
#[serde(with = "serde_bytes")] pub content: ObjectRef,
pub content: Vec<u8>,
} }
/// Snapshot of a Branch /// Snapshot of a Branch
@ -1957,9 +1956,8 @@ pub struct CompactV0 {
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
pub origin: Vec<u8>, pub origin: Vec<u8>,
/// Snapshot data structure /// Reference to Object containing Snapshot data structure
#[serde(with = "serde_bytes")] pub content: ObjectRef,
pub content: Vec<u8>,
} }
/// Snapshot of a Branch /// Snapshot of a Branch
@ -1989,6 +1987,11 @@ impl AsyncSignature {
// check that the signature object referenced here, is of type threshold_sig Partial // check that the signature object referenced here, is of type threshold_sig Partial
unimplemented!(); unimplemented!();
} }
pub fn reference(&self) -> &ObjectRef {
match self {
Self::V0(v0) => v0,
}
}
} }
/// Sync Threshold Signature of one or a chain of commits . V0 /// Sync Threshold Signature of one or a chain of commits . V0
@ -2013,6 +2016,11 @@ impl SyncSignature {
// check that the signature object referenced here, is of type threshold_sig Total or Owner // check that the signature object referenced here, is of type threshold_sig Total or Owner
unimplemented!(); unimplemented!();
} }
pub fn reference(&self) -> &ObjectRef {
match self {
Self::V0(v0) => v0,
}
}
} }
impl fmt::Display for SyncSignature { impl fmt::Display for SyncSignature {
@ -2347,6 +2355,72 @@ pub enum QuorumType {
IamTheSignature, IamTheSignature,
} }
impl QuorumType {
pub fn final_consistency(&self) -> bool {
match self {
Self::TotalOrder | Self::Owners | Self::IamTheSignature => true,
_ => false,
}
}
}
impl CommitBody {
pub fn get_type(&self) -> CommitType {
match self {
Self::V0(v0) => v0.get_type(),
}
}
pub fn get_signature_reference(&self) -> Option<ObjectRef> {
match self {
Self::V0(v0) => v0.get_signature_reference(),
}
}
}
impl CommitBodyV0 {
pub fn get_type(&self) -> CommitType {
match self {
Self::Branch(_) => CommitType::Branch,
Self::BranchCapRefresh(_) => CommitType::BranchCapRefresh,
Self::UpdateBranch(_) => CommitType::UpdateBranch,
Self::Snapshot(_) => CommitType::Snapshot,
Self::AsyncTransaction(_) => CommitType::Transaction,
Self::SyncTransaction(_) => CommitType::Transaction,
Self::AddFile(_) => CommitType::FileAdd,
Self::RemoveFile(_) => CommitType::FileRemove,
Self::Compact(_) => CommitType::Compact,
Self::AsyncSignature(_) => CommitType::AsyncSignature,
Self::CapRefreshed(_) => CommitType::CapRefreshed,
Self::SyncSignature(_) => CommitType::SyncSignature,
_ => CommitType::Other,
}
}
pub fn get_signature_reference(&self) -> Option<ObjectRef> {
match self {
Self::AsyncSignature(s) => Some(s.reference().clone()),
Self::SyncSignature(s) => Some(s.reference().clone()),
_ => None,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum CommitType {
Transaction,
FileAdd,
FileRemove,
Snapshot,
Compact,
AsyncSignature,
SyncSignature,
Branch,
UpdateBranch,
BranchCapRefresh,
CapRefreshed,
Other,
}
/// Content of a Commit /// Content of a Commit
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct CommitContentV0 { pub struct CommitContentV0 {
@ -2404,6 +2478,12 @@ impl CommitContent {
} }
} }
pub fn final_consistency(&self) -> bool {
match self {
CommitContent::V0(v0) => v0.quorum.final_consistency(),
}
}
pub fn author_digest(author: &UserId, overlay: OverlayId) -> Digest { pub fn author_digest(author: &UserId, overlay: OverlayId) -> Digest {
let author_id = serde_bare::to_vec(author).unwrap(); let author_id = serde_bare::to_vec(author).unwrap();
let overlay_id = serde_bare::to_vec(&overlay).unwrap(); let overlay_id = serde_bare::to_vec(&overlay).unwrap();
@ -2445,7 +2525,7 @@ pub struct CommitV0 {
/// Commit content /// Commit content
pub content: CommitContent, pub content: CommitContent,
/// Signature over the content (a CommitContent) by the author. an editor (userId) /// Signature over the content (a CommitContent) by the author. an editor (UserId)
pub sig: Sig, pub sig: Sig,
} }
@ -2562,6 +2642,8 @@ pub enum ObjectContentV0 {
SmallFile(SmallFile), SmallFile(SmallFile),
RandomAccessFileMeta(RandomAccessFileMeta), RandomAccessFileMeta(RandomAccessFileMeta),
RefreshCap(RefreshCap), RefreshCap(RefreshCap),
#[serde(with = "serde_bytes")]
Snapshot(Vec<u8>), // serialization of an AppState
} }
/// Immutable data stored encrypted in a Merkle tree /// Immutable data stored encrypted in a Merkle tree

@ -39,6 +39,9 @@ ng.init_headless(config).then( async() => {
console.log(q); console.log(q);
} }
let history = await ng.branch_history(session.session_id);
console.log(history);
// await ng.sparql_update(session.session_id, "DELETE DATA { <did:ng:t:AJQ5gCLoXXjalC9diTDCvxxWu5ZQUcYWEE821nhVRMcE> <did:ng:i> <did:ng:j> }"); // await ng.sparql_update(session.session_id, "DELETE DATA { <did:ng:t:AJQ5gCLoXXjalC9diTDCvxxWu5ZQUcYWEE821nhVRMcE> <did:ng:i> <did:ng:j> }");
// await ng.sparql_update(session.session_id, "INSERT DATA { <did:ng:t:AJQ5gCLoXXjalC9diTDCvxxWu5ZQUcYWEE821nhVRMcE> <did:ng:i> <did:ng:j> }"); // await ng.sparql_update(session.session_id, "INSERT DATA { <did:ng:t:AJQ5gCLoXXjalC9diTDCvxxWu5ZQUcYWEE821nhVRMcE> <did:ng:i> <did:ng:j> }");

@ -287,6 +287,33 @@ pub async fn rdf_dump(session_id: JsValue) -> Result<String, String> {
} }
} }
#[cfg(wasmpack_target = "nodejs")]
#[wasm_bindgen]
pub async fn branch_history(session_id: JsValue) -> Result<JsValue, String> {
let session_id: u64 = serde_wasm_bindgen::from_value::<u64>(session_id)
.map_err(|_| "Invalid session_id".to_string())?;
let request = AppRequest::V0(AppRequestV0 {
command: AppRequestCommandV0::new_history(),
nuri: NuriV0::new_private_store_target(),
payload: None,
session_id,
});
let res = nextgraph::local_broker::app_request(request)
.await
.map_err(|e: NgError| e.to_string())?;
let AppResponse::V0(res) = res;
match res {
AppResponseV0::History(s) => Ok(s
.to_js()
.serialize(&serde_wasm_bindgen::Serializer::new().serialize_maps_as_objects(true))
.unwrap()),
_ => Err("invalid response".to_string()),
}
}
#[cfg(wasmpack_target = "nodejs")] #[cfg(wasmpack_target = "nodejs")]
#[wasm_bindgen] #[wasm_bindgen]
pub async fn admin_create_user(config: JsValue) -> Result<JsValue, String> { pub async fn admin_create_user(config: JsValue) -> Result<JsValue, String> {

@ -18,7 +18,7 @@ use ng_repo::errors::VerifierError;
#[allow(unused_imports)] #[allow(unused_imports)]
use ng_repo::log::*; use ng_repo::log::*;
use ng_repo::object::Object; use ng_repo::object::Object;
use ng_repo::repo::{BranchInfo, Repo}; use ng_repo::repo::{BranchInfo, CommitInfo, Repo};
use ng_repo::store::Store; use ng_repo::store::Store;
use ng_repo::types::*; use ng_repo::types::*;
@ -452,18 +452,28 @@ impl CommitVerifier for AddFile {
if files.len() == 1 { if files.len() == 1 {
let refe = commit.files().remove(0); let refe = commit.files().remove(0);
let filename = FileName { let filename = FileName {
heads: vec![], //TODO: put the current heads
name: self.name().clone(), name: self.name().clone(),
nuri: refe.nuri(), nuri: refe.nuri(),
reference: refe, reference: refe,
}; };
let commit_id = commit.id().unwrap();
verifier.user_storage.as_ref().unwrap().branch_add_file( verifier.user_storage.as_ref().unwrap().branch_add_file(
commit.id().unwrap(), commit.id().unwrap(),
*branch_id, *branch_id,
filename.clone(), filename.clone(),
)?; )?;
let repo = verifier.get_repo(repo_id, store.get_store_repo())?;
verifier verifier
.push_app_response(branch_id, AppResponse::V0(AppResponseV0::File(filename))) .push_app_response(
branch_id,
AppResponse::V0(AppResponseV0::Patch(AppPatch {
commit_id,
commit_info: commit.as_info(repo),
graph: None,
discrete: None,
other: Some(OtherPatch::FileAdd(filename)),
})),
)
.await; .await;
Ok(()) Ok(())
} else { } else {

@ -9,6 +9,7 @@
//! Processor for each type of AppRequest //! Processor for each type of AppRequest
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use futures::channel::mpsc; use futures::channel::mpsc;
@ -20,6 +21,7 @@ use ng_repo::errors::*;
use ng_repo::file::{RandomAccessFile, ReadFile}; use ng_repo::file::{RandomAccessFile, ReadFile};
#[allow(unused_imports)] #[allow(unused_imports)]
use ng_repo::log::*; use ng_repo::log::*;
use ng_repo::repo::CommitInfo;
use ng_repo::types::BranchId; use ng_repo::types::BranchId;
use ng_repo::types::StoreRepo; use ng_repo::types::StoreRepo;
use ng_repo::types::*; use ng_repo::types::*;
@ -41,8 +43,11 @@ impl Verifier {
match command { match command {
AppRequestCommandV0::Fetch(fetch) => match fetch { AppRequestCommandV0::Fetch(fetch) => match fetch {
AppFetchContentV0::Subscribe => { AppFetchContentV0::Subscribe => {
let (_, branch_id, _) = self.open_for_target(&nuri.target, false).await?; let (repo_id, branch_id, store_repo) =
Ok(self.create_branch_subscription(branch_id).await?) self.open_for_target(&nuri.target, false).await?;
Ok(self
.create_branch_subscription(repo_id, branch_id, store_repo)
.await?)
} }
_ => unimplemented!(), _ => unimplemented!(),
}, },
@ -221,6 +226,21 @@ impl Verifier {
}) })
} }
fn history_for_nuri(
&self,
target: &NuriTargetV0,
) -> Result<(Vec<ObjectId>, HashMap<ObjectId, CommitInfo>), VerifierError> {
let (repo_id, branch_id, store_repo) = self.resolve_target(target)?; // TODO deal with targets that are commit heads
let repo = self.get_repo(&repo_id, &store_repo)?;
let branch = repo.branch(&branch_id)?;
repo.history_at_heads(&branch.current_heads).map(|history| {
(
branch.current_heads.iter().map(|h| h.id.clone()).collect(),
history,
)
})
}
pub(crate) async fn process( pub(crate) async fn process(
&mut self, &mut self,
command: &AppRequestCommandV0, command: &AppRequestCommandV0,
@ -291,6 +311,18 @@ impl Verifier {
return Ok(AppResponse::V0(AppResponseV0::Text(vec.join("\n")))); return Ok(AppResponse::V0(AppResponseV0::Text(vec.join("\n"))));
} }
AppFetchContentV0::History => {
if !nuri.is_valid_for_sparql_update() {
return Err(NgError::InvalidNuri);
}
return Ok(match self.history_for_nuri(&nuri.target) {
Err(e) => AppResponse::error(e.to_string()),
Ok((heads, history)) => {
AppResponse::V0(AppResponseV0::History(AppHistory { heads, history }))
}
});
}
_ => unimplemented!(), _ => unimplemented!(),
}, },
AppRequestCommandV0::FilePut => match payload { AppRequestCommandV0::FilePut => match payload {

@ -235,12 +235,14 @@ impl Verifier {
pub(crate) async fn create_branch_subscription( pub(crate) async fn create_branch_subscription(
&mut self, &mut self,
branch: BranchId, repo_id: RepoId,
branch_id: BranchId,
store_repo: StoreRepo,
) -> Result<(Receiver<AppResponse>, CancelFn), VerifierError> { ) -> Result<(Receiver<AppResponse>, CancelFn), VerifierError> {
//log_info!("#### create_branch_subscription {}", branch); //log_info!("#### create_branch_subscription {}", branch);
let (tx, rx) = mpsc::unbounded::<AppResponse>(); let (tx, rx) = mpsc::unbounded::<AppResponse>();
//log_info!("SUBSCRIBE"); //log_info!("SUBSCRIBE");
if let Some(returned) = self.branch_subscriptions.insert(branch, tx.clone()) { if let Some(returned) = self.branch_subscriptions.insert(branch_id, tx.clone()) {
//log_info!("RESUBSCRIBE"); //log_info!("RESUBSCRIBE");
if !returned.is_closed() { if !returned.is_closed() {
//log_info!("FORCE CLOSE"); //log_info!("FORCE CLOSE");
@ -249,19 +251,26 @@ impl Verifier {
} }
} }
let repo = self.get_repo(&repo_id, &store_repo)?;
let branch = repo.branch(&branch_id)?;
//let tx = self.branch_subscriptions.entry(branch).or_insert_with(|| {}); //let tx = self.branch_subscriptions.entry(branch).or_insert_with(|| {});
for file in self let files = self
.user_storage .user_storage
.as_ref() .as_ref()
.unwrap() .unwrap()
.branch_get_all_files(&branch)? .branch_get_all_files(&branch_id)?;
{ let state = AppState {
self.push_app_response(&branch, AppResponse::V0(AppResponseV0::File(file))) heads: branch.current_heads.iter().map(|h| h.id.clone()).collect(),
graph: None,
discrete: None,
files,
};
self.push_app_response(&branch_id, AppResponse::V0(AppResponseV0::State(state)))
.await; .await;
}
let fnonce = Box::new(move || { let fnonce = Box::new(move || {
log_info!("CLOSE_CHANNEL of subscription for branch {}", branch); log_info!("CLOSE_CHANNEL of subscription for branch {}", branch_id);
if !tx.is_closed() { if !tx.is_closed() {
tx.close_channel(); tx.close_channel();
} }

Loading…
Cancel
Save