diff --git a/ng-net/src/app_protocol.rs b/ng-net/src/app_protocol.rs index 903f179..d29fec9 100644 --- a/ng-net/src/app_protocol.rs +++ b/ng-net/src/app_protocol.rs @@ -9,7 +9,10 @@ //! App Protocol (between LocalBroker and Verifier) +use std::collections::HashMap; + use lazy_static::lazy_static; +use ng_repo::repo::CommitInfo; use ng_repo::utils::decode_overlayid; use regex::Regex; use serde::{Deserialize, Serialize}; @@ -46,6 +49,7 @@ pub enum AppFetchContentV0 { ReadQuery, // more to be detailed WriteQuery, // more to be detailed RdfDump, + History, } impl AppFetchContentV0 { @@ -134,6 +138,29 @@ impl NuriTargetV0 { } } +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct CommitInfoJs { + pub past: Vec, + pub key: String, + pub signature: Option, + pub author: String, + pub final_consistency: bool, + pub commit_type: CommitType, +} + +impl From<&CommitInfo> for CommitInfoJs { + fn from(info: &CommitInfo) -> Self { + CommitInfoJs { + past: info.past.iter().map(|objid| objid.to_string()).collect(), + key: info.key.to_string(), + signature: info.signature.as_ref().map(|s| NuriV0::object_ref(&s)), + author: info.author.clone(), + final_consistency: info.final_consistency, + commit_type: info.commit_type.clone(), + } + } +} + const DID_PREFIX: &str = "did:ng"; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -180,6 +207,10 @@ impl NuriV0 { format!("{DID_PREFIX}:b:{branch_base64}") } + pub fn object_ref(obj_ref: &ObjectRef) -> String { + format!("{DID_PREFIX}{}", obj_ref.nuri()) + } + pub fn token(token: &Digest) -> String { format!("{DID_PREFIX}:n:{token}") } @@ -361,6 +392,9 @@ impl AppRequestCommandV0 { pub fn new_rdf_dump() -> Self { AppRequestCommandV0::Fetch(AppFetchContentV0::RdfDump) } + pub fn new_history() -> Self { + AppRequestCommandV0::Fetch(AppFetchContentV0::History) + } } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -603,20 +637,57 @@ pub struct GraphState { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AppState { - heads: Vec, - graph: Option, // there is always a graph present in the branch. but it might not have been asked in the request - discrete: Option, + pub heads: Vec, + pub graph: Option, // there is always a graph present in the branch. but it might not have been asked in the request + pub discrete: Option, + pub files: Vec, } + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AppHistory { + pub heads: Vec, + pub history: HashMap, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AppHistoryJs { + pub heads: Vec, + pub history: HashMap, +} + +impl AppHistory { + pub fn to_js(&self) -> AppHistoryJs { + AppHistoryJs { + heads: self.heads.iter().map(|h| h.to_string()).collect(), + history: HashMap::from_iter( + self.history + .iter() + .map(|(id, info)| (id.to_string(), info.into())), + ), + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum OtherPatch { + FileAdd(FileName), + FileRemove(ObjectId), + AsyncSignature((ObjectRef, Vec)), + Other, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct AppPatch { - heads: Vec, - graph: Option, - discrete: Option, + pub commit_id: ObjectId, + pub commit_info: CommitInfo, + // or graph, or discrete, or both, or other. + pub graph: Option, + pub discrete: Option, + pub other: Option, } #[derive(Clone, Debug, Serialize, Deserialize)] pub struct FileName { - pub heads: Vec, pub name: Option, pub reference: ObjectRef, pub nuri: String, @@ -633,8 +704,9 @@ pub enum AppResponseV0 { SessionStart(AppSessionStartResponse), State(AppState), Patch(AppPatch), + History(AppHistory), Text(String), - File(FileName), + //File(FileName), FileUploading(u32), FileUploaded(ObjectRef), #[serde(with = "serde_bytes")] diff --git a/ng-repo/src/branch.rs b/ng-repo/src/branch.rs index cc44220..e0679ba 100644 --- a/ng-repo/src/branch.rs +++ b/ng-repo/src/branch.rs @@ -178,7 +178,7 @@ impl Branch { /// /// and collect in `visited` the ObjectIds encountered on the way, stopping at any commit already belonging to `theirs` or the root of DAG. /// optionally collecting the missing objects/blocks that couldn't be found locally on the way, - /// and also optionally, collecting the commits of theirs found on the way + /// and also optionally, collecting the commits of `theirs` found on the way pub fn load_causal_past( cobj: &Object, store: &Store, diff --git a/ng-repo/src/commit.rs b/ng-repo/src/commit.rs index 2f059cd..0b6a9d5 100644 --- a/ng-repo/src/commit.rs +++ b/ng-repo/src/commit.rs @@ -20,6 +20,7 @@ use crate::errors::*; #[allow(unused_imports)] use crate::log::*; use crate::object::*; +use crate::repo::CommitInfo; use crate::repo::Repo; use crate::store::Store; use crate::types::*; @@ -322,7 +323,7 @@ impl Commit { _ => return Err(CommitLoadError::NotACommit), }; commit.set_id(id); - commit.set_key(key.clone()); + commit.set_key(key); match commit.load_body(store) { Ok(_) => return Err(CommitLoadError::MissingBlocks(missing)), Err(CommitLoadError::MissingBlocks(mut missing_body)) => { @@ -344,7 +345,7 @@ impl Commit { _ => return Err(CommitLoadError::NotACommit), }; commit.set_id(id); - commit.set_key(key.clone()); + commit.set_key(key); commit.set_header(obj.header().clone()); if with_body { @@ -429,6 +430,34 @@ impl Commit { } } + /// Get author (a UserId) + pub fn author(&self) -> &Digest { + self.content().author() + } + + pub fn final_consistency(&self) -> bool { + self.content().final_consistency() + } + + pub fn get_type(&self) -> Option { + self.body().map(|b| b.get_type()) + } + + pub fn get_signature_reference(&self) -> Option { + self.body().map_or(None, |b| b.get_signature_reference()) + } + + pub fn as_info(&self, repo: &Repo) -> CommitInfo { + CommitInfo { + past: self.acks_ids(), + key: self.key().unwrap(), + signature: None, + author: repo.get_user_string(self.author()), + final_consistency: self.final_consistency(), + commit_type: self.get_type().unwrap(), + } + } + /// Get branch ID this commit is about pub fn branch(&self) -> &BranchId { self.content().branch() @@ -538,6 +567,28 @@ impl Commit { res } + /// Get acks (that have an ID in the header, without checking if there is a key for them in the header_keys) + /// if there is no header, returns an empty vec + pub fn acks_ids(&self) -> Vec { + match self { + Commit::V0(c) => match &c.header { + Some(h) => h.acks(), + None => vec![], + }, + } + } + + /// Get deps (that have an ID in the header, without checking if there is a key for them in the header_keys) + /// if there is no header, returns an empty vec + pub fn deps_ids(&self) -> Vec { + match self { + Commit::V0(c) => match &c.header { + Some(h) => h.deps(), + None => vec![], + }, + } + } + /// Get files pub fn files(&self) -> Vec { let mut res: Vec = vec![]; diff --git a/ng-repo/src/object.rs b/ng-repo/src/object.rs index 2a6cdb7..64768c0 100644 --- a/ng-repo/src/object.rs +++ b/ng-repo/src/object.rs @@ -1004,6 +1004,7 @@ impl fmt::Display for ObjectContent { ("RandomAccessFileMeta", format!("{}", "")) } ObjectContentV0::RefreshCap(_c) => ("RefreshCap", format!("{}", "")), + ObjectContentV0::Snapshot(_c) => ("Snapshot", format!("size={}", _c.len())), }, ), }; diff --git a/ng-repo/src/repo.rs b/ng-repo/src/repo.rs index e10cebc..ddb3c77 100644 --- a/ng-repo/src/repo.rs +++ b/ng-repo/src/repo.rs @@ -14,6 +14,8 @@ use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; +use serde::{Deserialize, Serialize}; + use crate::errors::*; #[allow(unused_imports)] use crate::log::*; @@ -153,6 +155,16 @@ impl fmt::Display for Repo { } } +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct CommitInfo { + pub past: Vec, + pub key: ObjectKey, + pub signature: Option, + pub author: String, + pub final_consistency: bool, + pub commit_type: CommitType, +} + impl Repo { #[cfg(any(test, feature = "testing"))] #[allow(deprecated)] @@ -161,6 +173,92 @@ impl Repo { Self::new_with_member(&pub_key, &pub_key, perms, store) } + pub(crate) fn get_user_string(&self, user_hash: &Digest) -> String { + self.members + .get(user_hash) + .map_or_else(|| format!("t:{user_hash}"), |info| format!("i:{}", info.id)) + } + + fn load_causal_past( + &self, + cobj: &Commit, + visited: &mut HashMap, + ) -> Result<(), VerifierError> { + let id = cobj.id().unwrap(); + if visited.get(&id).is_none() { + let commit_type = cobj.get_type().unwrap(); + let acks = cobj.acks(); + let (past, real_acks) = match commit_type { + CommitType::SyncSignature => { + assert_eq!(acks.len(), 1); + let dep = cobj.deps(); + assert_eq!(dep.len(), 1); + let mut current_commit = dep[0].clone(); + let sign_ref = cobj.get_signature_reference().unwrap(); + let real_acks; + loop { + let o = Commit::load(current_commit.clone(), &self.store, true)?; + let deps = o.deps(); + let commit_info = CommitInfo { + past: deps.iter().map(|r| r.id.clone()).collect(), + key: o.key().unwrap(), + signature: Some(sign_ref.clone()), + author: self.get_user_string(o.author()), + final_consistency: o.final_consistency(), + commit_type: o.get_type().unwrap(), + }; + let id = o.id().unwrap(); + visited.insert(id, commit_info); + if id == acks[0].id { + real_acks = o.acks(); + break; + } + assert_eq!(deps.len(), 1); + current_commit = deps[0].clone(); + } + (vec![dep[0].id], real_acks) + } + CommitType::AsyncSignature => { + let past: Vec = acks.iter().map(|r| r.id.clone()).collect(); + for p in past.iter() { + visited.get_mut(p).unwrap().signature = + Some(cobj.get_signature_reference().unwrap()); + } + (past, acks) + } + _ => (acks.iter().map(|r| r.id.clone()).collect(), acks), + }; + + let commit_info = CommitInfo { + past, + key: cobj.key().unwrap(), + signature: None, + author: self.get_user_string(cobj.author()), + final_consistency: cobj.final_consistency(), + commit_type, + }; + visited.insert(id, commit_info); + for past_ref in real_acks { + let o = Commit::load(past_ref, &self.store, true)?; + self.load_causal_past(&o, visited)?; + } + } + Ok(()) + } + + pub fn history_at_heads( + &self, + heads: &[ObjectRef], + ) -> Result, VerifierError> { + let mut res = HashMap::new(); + for id in heads { + if let Ok(cobj) = Commit::load(id.clone(), &self.store, true) { + self.load_causal_past(&cobj, &mut res)?; + } + } + Ok(res) + } + pub fn update_branch_current_heads( &mut self, branch: &BranchId, diff --git a/ng-repo/src/types.rs b/ng-repo/src/types.rs index 2db0748..f832513 100644 --- a/ng-repo/src/types.rs +++ b/ng-repo/src/types.rs @@ -1927,9 +1927,8 @@ pub struct SnapshotV0 { // Branch heads the snapshot was made from, can be useful when shared outside and the commit_header_key is set to None. otherwise it is redundant to ACKS pub heads: Vec, - /// Snapshot data structure - #[serde(with = "serde_bytes")] - pub content: Vec, + /// Reference to Object containing Snapshot data structure + pub content: ObjectRef, } /// Snapshot of a Branch @@ -1957,9 +1956,8 @@ pub struct CompactV0 { #[serde(with = "serde_bytes")] pub origin: Vec, - /// Snapshot data structure - #[serde(with = "serde_bytes")] - pub content: Vec, + /// Reference to Object containing Snapshot data structure + pub content: ObjectRef, } /// Snapshot of a Branch @@ -1989,6 +1987,11 @@ impl AsyncSignature { // check that the signature object referenced here, is of type threshold_sig Partial unimplemented!(); } + pub fn reference(&self) -> &ObjectRef { + match self { + Self::V0(v0) => v0, + } + } } /// Sync Threshold Signature of one or a chain of commits . V0 @@ -2013,6 +2016,11 @@ impl SyncSignature { // check that the signature object referenced here, is of type threshold_sig Total or Owner unimplemented!(); } + pub fn reference(&self) -> &ObjectRef { + match self { + Self::V0(v0) => v0, + } + } } impl fmt::Display for SyncSignature { @@ -2347,6 +2355,72 @@ pub enum QuorumType { IamTheSignature, } +impl QuorumType { + pub fn final_consistency(&self) -> bool { + match self { + Self::TotalOrder | Self::Owners | Self::IamTheSignature => true, + _ => false, + } + } +} + +impl CommitBody { + pub fn get_type(&self) -> CommitType { + match self { + Self::V0(v0) => v0.get_type(), + } + } + pub fn get_signature_reference(&self) -> Option { + match self { + Self::V0(v0) => v0.get_signature_reference(), + } + } +} + +impl CommitBodyV0 { + pub fn get_type(&self) -> CommitType { + match self { + Self::Branch(_) => CommitType::Branch, + Self::BranchCapRefresh(_) => CommitType::BranchCapRefresh, + Self::UpdateBranch(_) => CommitType::UpdateBranch, + Self::Snapshot(_) => CommitType::Snapshot, + Self::AsyncTransaction(_) => CommitType::Transaction, + Self::SyncTransaction(_) => CommitType::Transaction, + Self::AddFile(_) => CommitType::FileAdd, + Self::RemoveFile(_) => CommitType::FileRemove, + Self::Compact(_) => CommitType::Compact, + Self::AsyncSignature(_) => CommitType::AsyncSignature, + Self::CapRefreshed(_) => CommitType::CapRefreshed, + Self::SyncSignature(_) => CommitType::SyncSignature, + _ => CommitType::Other, + } + } + + pub fn get_signature_reference(&self) -> Option { + match self { + Self::AsyncSignature(s) => Some(s.reference().clone()), + Self::SyncSignature(s) => Some(s.reference().clone()), + _ => None, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum CommitType { + Transaction, + FileAdd, + FileRemove, + Snapshot, + Compact, + AsyncSignature, + SyncSignature, + Branch, + UpdateBranch, + BranchCapRefresh, + CapRefreshed, + Other, +} + /// Content of a Commit #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct CommitContentV0 { @@ -2404,6 +2478,12 @@ impl CommitContent { } } + pub fn final_consistency(&self) -> bool { + match self { + CommitContent::V0(v0) => v0.quorum.final_consistency(), + } + } + pub fn author_digest(author: &UserId, overlay: OverlayId) -> Digest { let author_id = serde_bare::to_vec(author).unwrap(); let overlay_id = serde_bare::to_vec(&overlay).unwrap(); @@ -2445,7 +2525,7 @@ pub struct CommitV0 { /// Commit content pub content: CommitContent, - /// Signature over the content (a CommitContent) by the author. an editor (userId) + /// Signature over the content (a CommitContent) by the author. an editor (UserId) pub sig: Sig, } @@ -2562,6 +2642,8 @@ pub enum ObjectContentV0 { SmallFile(SmallFile), RandomAccessFileMeta(RandomAccessFileMeta), RefreshCap(RefreshCap), + #[serde(with = "serde_bytes")] + Snapshot(Vec), // serialization of an AppState } /// Immutable data stored encrypted in a Merkle tree diff --git a/ng-sdk-js/app-node/index.js b/ng-sdk-js/app-node/index.js index fabe7c3..fe19206 100644 --- a/ng-sdk-js/app-node/index.js +++ b/ng-sdk-js/app-node/index.js @@ -39,6 +39,9 @@ ng.init_headless(config).then( async() => { console.log(q); } + let history = await ng.branch_history(session.session_id); + console.log(history); + // await ng.sparql_update(session.session_id, "DELETE DATA { }"); // await ng.sparql_update(session.session_id, "INSERT DATA { }"); diff --git a/ng-sdk-js/src/lib.rs b/ng-sdk-js/src/lib.rs index e5cb233..38bd6bd 100644 --- a/ng-sdk-js/src/lib.rs +++ b/ng-sdk-js/src/lib.rs @@ -287,6 +287,33 @@ pub async fn rdf_dump(session_id: JsValue) -> Result { } } +#[cfg(wasmpack_target = "nodejs")] +#[wasm_bindgen] +pub async fn branch_history(session_id: JsValue) -> Result { + let session_id: u64 = serde_wasm_bindgen::from_value::(session_id) + .map_err(|_| "Invalid session_id".to_string())?; + + let request = AppRequest::V0(AppRequestV0 { + command: AppRequestCommandV0::new_history(), + nuri: NuriV0::new_private_store_target(), + payload: None, + session_id, + }); + + let res = nextgraph::local_broker::app_request(request) + .await + .map_err(|e: NgError| e.to_string())?; + + let AppResponse::V0(res) = res; + match res { + AppResponseV0::History(s) => Ok(s + .to_js() + .serialize(&serde_wasm_bindgen::Serializer::new().serialize_maps_as_objects(true)) + .unwrap()), + _ => Err("invalid response".to_string()), + } +} + #[cfg(wasmpack_target = "nodejs")] #[wasm_bindgen] pub async fn admin_create_user(config: JsValue) -> Result { diff --git a/ng-verifier/src/commits/mod.rs b/ng-verifier/src/commits/mod.rs index dfc9709..36d506f 100644 --- a/ng-verifier/src/commits/mod.rs +++ b/ng-verifier/src/commits/mod.rs @@ -18,7 +18,7 @@ use ng_repo::errors::VerifierError; #[allow(unused_imports)] use ng_repo::log::*; use ng_repo::object::Object; -use ng_repo::repo::{BranchInfo, Repo}; +use ng_repo::repo::{BranchInfo, CommitInfo, Repo}; use ng_repo::store::Store; use ng_repo::types::*; @@ -452,18 +452,28 @@ impl CommitVerifier for AddFile { if files.len() == 1 { let refe = commit.files().remove(0); let filename = FileName { - heads: vec![], //TODO: put the current heads name: self.name().clone(), nuri: refe.nuri(), reference: refe, }; + let commit_id = commit.id().unwrap(); verifier.user_storage.as_ref().unwrap().branch_add_file( commit.id().unwrap(), *branch_id, filename.clone(), )?; + let repo = verifier.get_repo(repo_id, store.get_store_repo())?; verifier - .push_app_response(branch_id, AppResponse::V0(AppResponseV0::File(filename))) + .push_app_response( + branch_id, + AppResponse::V0(AppResponseV0::Patch(AppPatch { + commit_id, + commit_info: commit.as_info(repo), + graph: None, + discrete: None, + other: Some(OtherPatch::FileAdd(filename)), + })), + ) .await; Ok(()) } else { diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index b51f41f..db3ef30 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -9,6 +9,7 @@ //! Processor for each type of AppRequest +use std::collections::HashMap; use std::sync::Arc; use futures::channel::mpsc; @@ -20,6 +21,7 @@ use ng_repo::errors::*; use ng_repo::file::{RandomAccessFile, ReadFile}; #[allow(unused_imports)] use ng_repo::log::*; +use ng_repo::repo::CommitInfo; use ng_repo::types::BranchId; use ng_repo::types::StoreRepo; use ng_repo::types::*; @@ -41,8 +43,11 @@ impl Verifier { match command { AppRequestCommandV0::Fetch(fetch) => match fetch { AppFetchContentV0::Subscribe => { - let (_, branch_id, _) = self.open_for_target(&nuri.target, false).await?; - Ok(self.create_branch_subscription(branch_id).await?) + let (repo_id, branch_id, store_repo) = + self.open_for_target(&nuri.target, false).await?; + Ok(self + .create_branch_subscription(repo_id, branch_id, store_repo) + .await?) } _ => unimplemented!(), }, @@ -221,6 +226,21 @@ impl Verifier { }) } + fn history_for_nuri( + &self, + target: &NuriTargetV0, + ) -> Result<(Vec, HashMap), VerifierError> { + let (repo_id, branch_id, store_repo) = self.resolve_target(target)?; // TODO deal with targets that are commit heads + let repo = self.get_repo(&repo_id, &store_repo)?; + let branch = repo.branch(&branch_id)?; + repo.history_at_heads(&branch.current_heads).map(|history| { + ( + branch.current_heads.iter().map(|h| h.id.clone()).collect(), + history, + ) + }) + } + pub(crate) async fn process( &mut self, command: &AppRequestCommandV0, @@ -291,6 +311,18 @@ impl Verifier { return Ok(AppResponse::V0(AppResponseV0::Text(vec.join("\n")))); } + AppFetchContentV0::History => { + if !nuri.is_valid_for_sparql_update() { + return Err(NgError::InvalidNuri); + } + + return Ok(match self.history_for_nuri(&nuri.target) { + Err(e) => AppResponse::error(e.to_string()), + Ok((heads, history)) => { + AppResponse::V0(AppResponseV0::History(AppHistory { heads, history })) + } + }); + } _ => unimplemented!(), }, AppRequestCommandV0::FilePut => match payload { diff --git a/ng-verifier/src/verifier.rs b/ng-verifier/src/verifier.rs index 8069637..8ca94b1 100644 --- a/ng-verifier/src/verifier.rs +++ b/ng-verifier/src/verifier.rs @@ -235,12 +235,14 @@ impl Verifier { pub(crate) async fn create_branch_subscription( &mut self, - branch: BranchId, + repo_id: RepoId, + branch_id: BranchId, + store_repo: StoreRepo, ) -> Result<(Receiver, CancelFn), VerifierError> { //log_info!("#### create_branch_subscription {}", branch); let (tx, rx) = mpsc::unbounded::(); //log_info!("SUBSCRIBE"); - if let Some(returned) = self.branch_subscriptions.insert(branch, tx.clone()) { + if let Some(returned) = self.branch_subscriptions.insert(branch_id, tx.clone()) { //log_info!("RESUBSCRIBE"); if !returned.is_closed() { //log_info!("FORCE CLOSE"); @@ -249,19 +251,26 @@ impl Verifier { } } + let repo = self.get_repo(&repo_id, &store_repo)?; + let branch = repo.branch(&branch_id)?; + //let tx = self.branch_subscriptions.entry(branch).or_insert_with(|| {}); - for file in self + let files = self .user_storage .as_ref() .unwrap() - .branch_get_all_files(&branch)? - { - self.push_app_response(&branch, AppResponse::V0(AppResponseV0::File(file))) - .await; - } + .branch_get_all_files(&branch_id)?; + let state = AppState { + heads: branch.current_heads.iter().map(|h| h.id.clone()).collect(), + graph: None, + discrete: None, + files, + }; + self.push_app_response(&branch_id, AppResponse::V0(AppResponseV0::State(state))) + .await; let fnonce = Box::new(move || { - log_info!("CLOSE_CHANNEL of subscription for branch {}", branch); + log_info!("CLOSE_CHANNEL of subscription for branch {}", branch_id); if !tx.is_closed() { tx.close_channel(); }