diff --git a/Cargo.lock b/Cargo.lock index 2918d917..6deb20d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3498,6 +3498,7 @@ name = "ng-sdk-js" version = "0.1.0-preview.1" dependencies = [ "async-std", + "futures", "getrandom 0.1.16", "gloo-timers", "js-sys", diff --git a/ng-app/index-native.html b/ng-app/index-native.html index 395ece1d..223b3c46 100644 --- a/ng-app/index-native.html +++ b/ng-app/index-native.html @@ -56,9 +56,34 @@ /> NextGraph + +
+
+ + + + + + +
+
diff --git a/ng-app/src-tauri/src/lib.rs b/ng-app/src-tauri/src/lib.rs index 50472df2..af7d54f0 100644 --- a/ng-app/src-tauri/src/lib.rs +++ b/ng-app/src-tauri/src/lib.rs @@ -338,6 +338,25 @@ async fn decode_invitation(invite: String) -> Option { decode_invitation_string(invite) } +#[tauri::command(rename_all = "snake_case")] +async fn file_get( + session_id: u64, + stream_id: &str, + reference: BlockRef, + branch_nuri: String, + app: tauri::AppHandle, +) -> Result<(), String> { + let branch_nuri = + NuriV0::new_from(&branch_nuri).map_err(|e| format!("branch_nuri: {}", e.to_string()))?; + let mut nuri = NuriV0::new_from_obj_ref(&reference); + nuri.copy_target_from(&branch_nuri); + + let mut request = AppRequest::new(AppRequestCommandV0::FileGet, nuri, None); + request.set_session_id(session_id); + + app_request_stream(request, stream_id, app).await +} + #[tauri::command(rename_all = "snake_case")] async fn app_request_stream( request: AppRequest, @@ -382,6 +401,69 @@ async fn app_request_stream( Ok(()) } +#[tauri::command(rename_all = "snake_case")] +async fn file_save_to_downloads( + session_id: u64, + reference: ObjectRef, + filename: String, + branch_nuri: String, + app: tauri::AppHandle, +) -> Result<(), String> { + let branch_nuri = + NuriV0::new_from(&branch_nuri).map_err(|e| format!("branch_nuri: {}", e.to_string()))?; + let mut nuri = NuriV0::new_from_obj_ref(&reference); + nuri.copy_target_from(&branch_nuri); + + let mut request = AppRequest::new(AppRequestCommandV0::FileGet, nuri, None); + request.set_session_id(session_id); + + let (mut reader, cancel) = nextgraph::local_broker::app_request_stream(request) + .await + .map_err(|e| e.to_string())?; + + let mut file_vec: Vec = vec![]; + while let Some(app_response) = reader.next().await { + match app_response { + AppResponse::V0(AppResponseV0::FileMeta(filemeta)) => { + file_vec = Vec::with_capacity(filemeta.size as usize); + } + AppResponse::V0(AppResponseV0::FileBinary(mut bin)) => { + if !bin.is_empty() { + file_vec.append(&mut bin); + } + } + AppResponse::V0(AppResponseV0::EndOfStream) => break, + _ => return Err("invalid response".to_string()), + } + } + + let mut i: usize = 0; + loop { + let dest_filename = if i == 0 { + filename.clone() + } else { + filename + .rsplit_once(".") + .map(|(l, r)| format!("{l} ({}).{r}", i.to_string())) + .or_else(|| Some(format!("{filename} ({})", i.to_string()))) + .unwrap() + }; + + let path = app + .path() + .resolve(dest_filename, BaseDirectory::Download) + .unwrap(); + + if path.exists() { + i = i + 1; + } else { + write(path, &file_vec).map_err(|e| e.to_string())?; + break; + } + } + Ok(()) +} + #[tauri::command(rename_all = "snake_case")] async fn doc_fetch_private_subscribe() -> Result { let request = AppRequest::new( @@ -494,7 +576,7 @@ async fn sparql_query( } #[tauri::command(rename_all = "snake_case")] -async fn app_request(request: AppRequest, _app: tauri::AppHandle) -> Result { +async fn app_request(request: AppRequest) -> Result { //log_debug!("app request {:?}", request); nextgraph::local_broker::app_request(request) @@ -502,19 +584,40 @@ async fn app_request(request: AppRequest, _app: tauri::AppHandle) -> Result, +) -> Result { + let nuri = NuriV0::new_from(&nuri).map_err(|e| e.to_string())?; + + let payload = payload.map(|p| AppRequestPayload::V0(p)); + + let request = AppRequest::V0(AppRequestV0 { + session_id, + command, + nuri, + payload, + }); + + app_request(request).await +} + #[tauri::command(rename_all = "snake_case")] async fn upload_chunk( session_id: u64, upload_id: u32, chunk: serde_bytes::ByteBuf, - nuri: NuriV0, + nuri: String, _app: tauri::AppHandle, ) -> Result { //log_debug!("upload_chunk {:?}", chunk); let mut request = AppRequest::new( AppRequestCommandV0::FilePut, - nuri, + NuriV0::new_from(&nuri).map_err(|e| e.to_string())?, Some(AppRequestPayload::V0( AppRequestPayloadV0::RandomAccessFilePutChunk((upload_id, chunk)), )), @@ -725,7 +828,10 @@ impl AppBuilder { doc_fetch_repo_subscribe, cancel_stream, app_request_stream, + file_get, + file_save_to_downloads, app_request, + app_request_with_nuri_command, upload_chunk, get_device_name, sparql_query, diff --git a/ng-app/src/api.ts b/ng-app/src/api.ts index cd7b595d..9210962b 100644 --- a/ng-app/src/api.ts +++ b/ng-app/src/api.ts @@ -39,6 +39,7 @@ const mapping = { "user_connect": ["info","user_id","location"], "user_disconnect": ["user_id"], "app_request": ["request"], + "app_request_with_nuri_command": ["nuri", "command", "session_id", "payload"], "sparql_query": ["session_id","sparql","nuri"], "sparql_update": ["session_id","sparql","nuri"], "test": [ ], @@ -46,6 +47,7 @@ const mapping = { "doc_fetch_private_subscribe": [], "doc_fetch_repo_subscribe": ["repo_o"], "branch_history": ["session_id", "nuri"], + "file_save_to_downloads": ["session_id", "reference", "filename", "branch_nuri"], } @@ -162,7 +164,42 @@ const handler = { } return ret; } - else if (path[0] === "app_request_stream") { + else if (path[0] === "file_get") { + let stream_id = (lastStreamId += 1).toString(); + //console.log("stream_id",stream_id); + let { getCurrent } = await import("@tauri-apps/plugin-window"); + //let session_id = args[0]; + let callback = args[3]; + + let unlisten = await getCurrent().listen(stream_id, async (event) => { + //console.log(event.payload); + if (event.payload.V0.FileBinary) { + event.payload.V0.FileBinary = Uint8Array.from(event.payload.V0.FileBinary); + } + let ret = callback(event.payload); + if (ret === true) { + await tauri.invoke("cancel_stream", {stream_id}); + } else if (ret.then) { + ret.then(async (val)=> { + if (val === true) { + await tauri.invoke("cancel_stream", {stream_id}); + } + }); + } + }) + try { + await tauri.invoke("file_get",{stream_id, session_id:args[0], reference: args[1], branch_nuri:args[2]}); + } catch (e) { + unlisten(); + await tauri.invoke("cancel_stream", {stream_id}); + throw e; + } + return () => { + unlisten(); + tauri.invoke("cancel_stream", {stream_id}); + } + + } else if (path[0] === "app_request_stream") { let stream_id = (lastStreamId += 1).toString(); //console.log("stream_id",stream_id); let { getCurrent } = await import("@tauri-apps/plugin-window"); @@ -170,7 +207,7 @@ const handler = { let request = args[0]; let callback = args[1]; - let unlisten = await getCurrent().listen(stream_id, (event) => { + let unlisten = await getCurrent().listen(stream_id, async (event) => { //console.log(event.payload); if (event.payload.V0.FileBinary) { event.payload.V0.FileBinary = Uint8Array.from(event.payload.V0.FileBinary); @@ -184,7 +221,16 @@ const handler = { let removes_json_str = new TextDecoder().decode(Uint8Array.from(event.payload.V0.Patch.graph.removes)); event.payload.V0.Patch.graph.removes = JSON.parse(removes_json_str); } - callback(event.payload).then(()=> {}) + let ret = callback(event.payload); + if (ret === true) { + await tauri.invoke("cancel_stream", {stream_id}); + } else if (ret.then) { + ret.then(async (val)=> { + if (val === true) { + await tauri.invoke("cancel_stream", {stream_id}); + } + }); + } }) try { await tauri.invoke("app_request_stream",{stream_id, request}); diff --git a/ng-app/src/apps/ListView.svelte b/ng-app/src/apps/ListView.svelte index e91c46d6..5dbb210a 100644 --- a/ng-app/src/apps/ListView.svelte +++ b/ng-app/src/apps/ListView.svelte @@ -12,7 +12,6 @@ -
-
- {#if $cannot_load_offline} -
- - {@html $t("doc.cannot_load_offline")} - {$t("pages.user_panel.title")}. - -
- {:else} -
- - - - onFileSelected(e)} - bind:this={fileinput} - /> -
- {#if upload_progress !== null} -
- -
- {/if} - {#if commits} - {#await commits.load()} -

{$t("connectivity.loading")}...

- {:then} - {#each $commits.graph as triple} {triple}
{/each} - {#each $commits.heads as head} {head}
{/each} - {#each $commits.files as file} -

- {file.name} - {#await get_blob(file)} -

- -
- {:then url} - {#if url} - {file.name} - {/if} - {/await} -

- {/each} - {/await} - {/if} - {/if} -
diff --git a/ng-app/src/lib/panes/Files.svelte b/ng-app/src/lib/panes/Files.svelte new file mode 100644 index 00000000..5df456bf --- /dev/null +++ b/ng-app/src/lib/panes/Files.svelte @@ -0,0 +1,237 @@ + + + + + +
+
+ + + onFileSelected(e)} + bind:this={fileinput} + /> +
+ {#if upload_progress !== null} +
+ +
+ {/if} + {#if commits} + {#await commits.load()} +

{$t("connectivity.loading")}...

+ {:then} + {#each $commits.files as file} +

+ + {#await get_blob(file, true)} +

+ +
+ {:then url} + {#await isImage(url) then is} + {#if is} + {file.name} + {/if} + {/await} + {file.name}
+ {#if url === false} + {$t("errors.cannot_load_this_file")} + {:else if prepare_url(file.nuri)} + + + {/if} + {/await} +

+ {/each} + {/await} + {/if} + +
\ No newline at end of file diff --git a/ng-app/src/lib/panes/History.svelte b/ng-app/src/lib/panes/History.svelte index 8cb0903b..de91bdf8 100644 --- a/ng-app/src/lib/panes/History.svelte +++ b/ng-app/src/lib/panes/History.svelte @@ -13,9 +13,6 @@ import { branch_subscribe, active_session, - cannot_load_offline, - online, - get_blob } from "../../store"; import { get } from "svelte/store"; import { onMount, onDestroy, tick } from "svelte"; @@ -36,7 +33,7 @@ import { t } from "svelte-i18n"; import { Button, Progressbar, Spinner, Alert } from "flowbite-svelte"; - import { cur_tab, nav_bar, can_have_header, header_icon, header_title, header_description, cur_branch, set_header_in_view, edit_header_button, cur_app, load_official_app } from "../../tab"; + import { cur_tab } from "../../tab"; import ng from "../../api"; import { @@ -139,7 +136,7 @@ {#if commit[1].final_consistency} {:else if commit[1].signature} {/if} - + {#if commit[1].commit_type==="TransactionBoth"}{/if} {commit[0].substring(0,7)}
{commit[1].author.substring(0,9)} diff --git a/ng-app/src/locales/de.json b/ng-app/src/locales/de.json index 0e9d390e..d7eee7b0 100644 --- a/ng-app/src/locales/de.json +++ b/ng-app/src/locales/de.json @@ -74,11 +74,6 @@ "description": "Auf diesen Gerät scheint es noch kein Wallet zu geben.
Wenn du bereits ein Wallet hast, wähle \"Anmelden\". Andernfalls wähle \"Wallet erstellen\".", "create_wallet": "Wallet erstellen" }, - "test": { - "cannot_load_offline": "Du bist offline und nutzt die Web-App. Du musst dich mindestens ein mal mit dem Broker verbinden, bevor du die App lokal nutzen kannst. Das liegt daran, dass die Web-App keine lokalen Kopien deiner Dokumente speichern kann.

Sobald du ein mal verbunden warst, wirst du auch bei Verbindungsabbruch zu einigen Funktionen Zugriff haben. Das Senden von Binärdateien ist dann nicht möglich, da dein Browser nur begrenzte Speicherkapazitäten von circa 5MB zur Verfügung stellt.

Diese Einschränkungen fallen weg, sobald das Feature \"UserStorage for Web\" veröffentlicht wurde!

Deinen Verbindungsstatus siehst du auf", - "add_image": "Bild hinzufügen", - "upload_progress": "Lade hoch..." - }, "login": { "heading": "Wie öffente ich mein Wallet? Es gibt zwei Optionen:", "with_pazzle": "Mit deinem Pazzle", @@ -329,7 +324,7 @@ "ServerError": "Server-Fehler.", "InvalidResponse": "Ungültige Antwort erhalten.", "BootstrapError": "Fehler beim Bootstrapping", - "NotAServerError": "Kein Server.", + "NotAServerError": "Kein Server-Fehler.", "VerifierError": "Fehler während der Überprüfung.", "SiteNotFoundOnBroker": "Die Seite kann nicht auf dem Broker gefunden werden", "BrokerConfigErrorStr": "{error}", diff --git a/ng-app/src/locales/en.json b/ng-app/src/locales/en.json index 19bffc98..07b57912 100644 --- a/ng-app/src/locales/en.json +++ b/ng-app/src/locales/en.json @@ -27,6 +27,11 @@ "chat": "Chat" } }, + "file": { + "download": "Download", + "upload_progress": "Uploading...", + "upload": "Upload file" + }, "errors": { "InvalidNuri": "Invalid NURI" }, @@ -273,11 +278,6 @@ "description": "We could not find a wallet saved on this device.
If you already have a wallet, select \"Log in\", otherwise, select \"Create Wallet\" here below.", "create_wallet": "Create Wallet" }, - "test": { - "cannot_load_offline": "You are offline and using the web app. You need to connect to the broker at least once before you can start using the app locally because the web app does not keep a local copy of your documents.

Once connected, if you lose connectivity again, you will be able to have limited access to some functionalities. Sending binary files won't be possible, because the limit of local storage in your browser is around 5MB.

All those limitations will be lifted once the \"UserStorage for Web\" feature will be released. Stay tuned!

Check your connection status in the ", - "add_image": "Add Image", - "upload_progress": "Uploading..." - }, "login": { "heading": "How to open your wallet? You have 2 options:", "with_pazzle": "With your Pazzle", @@ -550,7 +550,8 @@ "IncompatibleQrCode": "You scanned a NextGraph QR-Code that is of the wrong type.", "NotARendezVous": "You scanned an invalid QR-Code.", "camera_unavailable": "Camera is unavailable.", - "ServerAlreadyRunningInOtherProcess": "App is already running on this device. Check it and close it." + "ServerAlreadyRunningInOtherProcess": "App is already running on this device. Check it and close it.", + "cannot_load_this_file": "Cannot load this file" }, "connectivity": { "stopped": "Stopped", diff --git a/ng-app/src/store.ts b/ng-app/src/store.ts index 14c1aedb..3cb9038d 100644 --- a/ng-app/src/store.ts +++ b/ng-app/src/store.ts @@ -452,11 +452,11 @@ export const branch_subscribe = function(nuri:string, in_tab:boolean) { //console.log("sub"); let already_subscribed = all_branches[nuri]; if (!already_subscribed) { - const { subscribe, set, update } = writable({graph:[], discrete:[], files:[], history: {start:()=>{}, stop:()=>{}, take:()=>{}, commits:false}, heads: []}); // create the underlying writable store + const { subscribe, set, update } = writable({graph:[], discrete:[], files:[], history: {start:()=>{}, stop:()=>{}, commits:false}, heads: []}); // create the underlying writable store // take:()=>{}, update((old)=> { old.history.start = () => update((o) => {o.history.commits = true; return o;}) ; old.history.stop = () => update((o) => {o.history.commits = false; return o;}) ; - old.history.take = () => { let res: boolean | Array<{}> = false; update((o) => {res = o.history.commits; o.history.commits = []; return o;}); return res;} + //old.history.take = () => { let res: boolean | Array<{}> = false; update((o) => {res = o.history.commits; o.history.commits = []; return o;}); return res;} return old;}); let count = 0; let unsub = () => { }; @@ -535,6 +535,10 @@ export const branch_subscribe = function(nuri:string, in_tab:boolean) { } old.graph.sort(); } + tab_update(nuri, ($cur_tab) => { + $cur_tab.branch.files = old.files.length; + return $cur_tab; + }); } else if (response.V0.Patch) { let i = old.heads.length; while (i--) { @@ -570,6 +574,10 @@ export const branch_subscribe = function(nuri:string, in_tab:boolean) { old.graph.sort(); } else if (response.V0.Patch.other?.FileAdd) { old.files.unshift(response.V0.Patch.other.FileAdd); + tab_update(nuri, ($cur_tab) => { + $cur_tab.branch.files = old.files.length; + return $cur_tab; + }); } else { } @@ -637,36 +645,25 @@ export const branch_subscribe = function(nuri:string, in_tab:boolean) { }; let blob_cache = {}; -export async function get_blob(ref: { nuri: string; reference: { key: any; id: any; }; }) { +export async function get_blob(ref: { nuri: string; reference: { key: any; id: any; }; }, only_img: boolean) { if (!ref) return false; const cached = blob_cache[ref.nuri]; - if (cached) { + if (cached && (((await cached) !== true) || only_img )) { return cached; } let prom = new Promise(async (resolve) => { try { - let nuri = { - target: "PrivateStore", - entire_store: false, - access: [{ Key: ref.reference.key }], - locator: [], - object: ref.reference.id, - }; - - let file_request = { - V0: { - command: "FileGet", - nuri, - session_id: get(active_session).session_id, - }, - }; - let final_blob; let content_type; - let unsub = await ng.app_request_stream(file_request, async (blob) => { + let branch_nuri = "did:ng:"+get(cur_tab).branch.nuri; + let cancel = await ng.file_get(get(active_session).session_id, ref.reference, branch_nuri, async (blob) => { //console.log("GOT APP RESPONSE", blob); if (blob.V0.FileMeta) { content_type = blob.V0.FileMeta.content_type; + if (only_img && !content_type.startsWith("image/")) { + resolve(true); + return true;// to cancel + } final_blob = new Blob([], { type: content_type }); } else if (blob.V0.FileBinary) { if (blob.V0.FileBinary.byteLength > 0) { diff --git a/ng-app/src/tab.ts b/ng-app/src/tab.ts index 7481b71a..7c518f5f 100644 --- a/ng-app/src/tab.ts +++ b/ng-app/src/tab.ts @@ -428,7 +428,7 @@ export const save = async () => { } export const all_files_count = derived(cur_tab, ($cur_tab) => { - let total = $cur_tab.branch.attachments + $cur_tab.branch.files; + let total = $cur_tab.branch.files; return total ? `(${total})` : ""; }); diff --git a/ng-broker/src/server_broker.rs b/ng-broker/src/server_broker.rs index ac784603..b82acc2c 100644 --- a/ng-broker/src/server_broker.rs +++ b/ng-broker/src/server_broker.rs @@ -328,7 +328,7 @@ impl IServerBroker for ServerBroker { { let mut state = self.state.write().await; if state.wallet_rendezvous.contains_key(&rendezvous) { - let _ = sender.send(Err(ServerError::BrokerError)); + let _ = sender.send(Err(ServerError::BrokerError)).await; sender.close_channel(); return receiver; } else { diff --git a/ng-net/src/app_protocol.rs b/ng-net/src/app_protocol.rs index 1c99657b..ba665dc9 100644 --- a/ng-net/src/app_protocol.rs +++ b/ng-net/src/app_protocol.rs @@ -187,6 +187,9 @@ pub struct NuriV0 { } impl NuriV0 { + pub fn copy_target_from(&mut self, nuri: &NuriV0) { + self.target = nuri.target.clone(); + } pub fn commit_graph_name(commit_id: &ObjectId, overlay_id: &OverlayId) -> String { format!("{DID_PREFIX}:c:{commit_id}:v:{overlay_id}") } @@ -261,6 +264,20 @@ impl NuriV0 { }) } + pub fn new_from_obj_ref(obj_ref: &ObjectRef) -> Self { + Self { + identity: None, + target: NuriTargetV0::None, + entire_store: false, + object: Some(obj_ref.id), + branch: None, + overlay: None, + access: vec![NgAccessV0::Key(obj_ref.key.clone())], + topic: None, + locator: vec![], + } + } + pub fn new_private_store_target() -> Self { Self { identity: None, @@ -319,7 +336,7 @@ impl NuriV0 { let key = decode_sym_key(k)?; Ok(Self { identity: None, - target: NuriTargetV0::PrivateStore, + target: NuriTargetV0::None, entire_store: false, object: Some(id), branch: None, diff --git a/ng-sdk-js/Cargo.toml b/ng-sdk-js/Cargo.toml index d8247d55..1eb00ae3 100644 --- a/ng-sdk-js/Cargo.toml +++ b/ng-sdk-js/Cargo.toml @@ -29,6 +29,7 @@ getrandom = { version = "0.1.1", features = ["wasm-bindgen"] } rand = { version = "0.7", features = ["getrandom"] } wasm-bindgen = { version = "0.2", features = ["serde-serialize"] } sys-locale = { version = "0.3.1", features = ["js"] } +futures = "0.3.24" ng-repo = { path = "../ng-repo" } ng-net = { path = "../ng-net" } ng-client-ws = { path = "../ng-client-ws" } diff --git a/ng-sdk-js/src/lib.rs b/ng-sdk-js/src/lib.rs index 9b18a106..a28728cd 100644 --- a/ng-sdk-js/src/lib.rs +++ b/ng-sdk-js/src/lib.rs @@ -23,6 +23,8 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; // use js_sys::Reflect; use async_std::stream::StreamExt; +use futures::channel::mpsc; +use futures::SinkExt; use js_sys::{Array, Object}; use oxrdf::Triple; use sys_locale::get_locales; @@ -40,7 +42,7 @@ use ng_net::broker::*; use ng_net::types::{BindAddress, ClientInfo, ClientInfoV0, ClientType, CreateAccountBSP, IP}; use ng_net::utils::{ decode_invitation_string, parse_ip_and_port_for, retrieve_local_bootstrap, retrieve_local_url, - spawn_and_log_error, Receiver, ResultSend, + spawn_and_log_error, Receiver, ResultSend, Sender, }; use ng_net::{actor::*, actors::admin::*}; use ng_net::{WS_PORT, WS_PORT_REVERSE_PROXY}; @@ -51,6 +53,7 @@ use ng_wallet::types::*; use ng_wallet::*; use nextgraph::local_broker::*; +use nextgraph::verifier::CancelFn; use crate::model::*; @@ -811,25 +814,76 @@ pub async fn test() { log_debug!("{:?}", client_info); } +// #[wasm_bindgen] +// pub async fn app_request_stream_with_nuri_command( +// nuri: String, +// command: JsValue, +// session_id: JsValue, +// callback: &js_sys::Function, +// payload: JsValue, +// ) -> Result { +// let session_id: u64 = serde_wasm_bindgen::from_value::(session_id) +// .map_err(|_| "Deserialization error of session_id".to_string())?; +// let nuri = NuriV0::new_from(&nuri).map_err(|e| e.to_string())?; + +// let command = serde_wasm_bindgen::from_value::(command) +// .map_err(|_| "Deserialization error of AppRequestCommandV0".to_string())?; + +// let payload = if !payload.is_undefined() && payload.is_object() { +// Some(AppRequestPayload::V0( +// serde_wasm_bindgen::from_value::(payload) +// .map_err(|_| "Deserialization error of AppRequestPayloadV0".to_string())?, +// )) +// } else { +// None +// }; + +// let request = AppRequest::V0(AppRequestV0 { +// session_id, +// command, +// nuri, +// payload, +// }); +// app_request_stream_(request, callback).await +// } + +// #[wasm_bindgen] +// pub async fn app_request_stream( +// // js_session_id: JsValue, +// request: JsValue, +// callback: &js_sys::Function, +// ) -> Result { +// let request = serde_wasm_bindgen::from_value::(request) +// .map_err(|_| "Deserialization error of AppRequest".to_string())?; + +// app_request_stream_(request, callback).await +// } #[wasm_bindgen] pub async fn app_request_stream( // js_session_id: JsValue, request: JsValue, callback: &js_sys::Function, ) -> Result { - // let session_id: u64 = serde_wasm_bindgen::from_value::(js_session_id) - // .map_err(|_| "Deserialization error of session_id".to_string())?; - let request = serde_wasm_bindgen::from_value::(request) .map_err(|_| "Deserialization error of AppRequest".to_string())?; + app_request_stream_(request, callback).await +} + +async fn app_request_stream_( + request: AppRequest, + callback: &js_sys::Function, +) -> Result { let (reader, cancel) = nextgraph::local_broker::app_request_stream(request) .await .map_err(|e: NgError| e.to_string())?; + let (canceller_tx, canceller_rx) = mpsc::unbounded(); + async fn inner_task( mut reader: Receiver, callback: js_sys::Function, + mut canceller_tx: Sender<()>, ) -> ResultSend<()> { while let Some(app_response) = reader.next().await { let app_response = nextgraph::verifier::prepare_app_response_for_js(app_response)?; @@ -881,10 +935,27 @@ pub async fn app_request_stream( Ok(jsval) => { let promise_res: Result = jsval.dyn_into(); match promise_res { - Ok(promise) => { - let _ = JsFuture::from(promise).await; + Ok(promise) => match JsFuture::from(promise).await { + Ok(js_value) => { + if js_value == JsValue::TRUE { + log_debug!("cancel because true"); + reader.close(); + canceller_tx.send(()).await; + canceller_tx.close_channel(); + break; + } + } + Err(_) => {} + }, + Err(returned_val) => { + if returned_val == JsValue::TRUE { + log_debug!("cancel because true"); + reader.close(); + canceller_tx.send(()).await; + canceller_tx.close_channel(); + break; + } } - Err(_) => {} } } Err(e) => { @@ -895,12 +966,23 @@ pub async fn app_request_stream( Ok(()) } - spawn_and_log_error(inner_task(reader, callback.clone())); + async fn inner_canceller(mut canceller_rx: Receiver<()>, cancel: CancelFn) -> ResultSend<()> { + if let Some(_) = canceller_rx.next().await { + log_info!("cancelling"); + cancel(); + } + Ok(()) + } + + spawn_and_log_error(inner_canceller(canceller_rx, cancel)); + + spawn_and_log_error(inner_task(reader, callback.clone(), canceller_tx.clone())); let cb = Closure::once(move || { - log_info!("cancelling"); + log_info!("trying to cancel"); //sender.close_channel() - cancel(); + canceller_tx.unbounded_send(()); + canceller_tx.close_channel(); }); //Closure::wrap(Box::new(move |sender| sender.close_channel()) as Box)>); let ret = cb.as_ref().clone(); @@ -922,6 +1004,43 @@ pub async fn app_request(request: JsValue) -> Result { Ok(serde_wasm_bindgen::to_value(&response).unwrap()) } +#[wasm_bindgen] +pub async fn app_request_with_nuri_command( + nuri: String, + command: JsValue, + session_id: JsValue, + payload: JsValue, +) -> Result { + let session_id: u64 = serde_wasm_bindgen::from_value::(session_id) + .map_err(|_| "Deserialization error of session_id".to_string())?; + let nuri = NuriV0::new_from(&nuri).map_err(|e| e.to_string())?; + + let command = serde_wasm_bindgen::from_value::(command) + .map_err(|_| "Deserialization error of AppRequestCommandV0".to_string())?; + + let payload = if !payload.is_undefined() && payload.is_object() { + Some(AppRequestPayload::V0( + serde_wasm_bindgen::from_value::(payload) + .map_err(|_| "Deserialization error of AppRequestPayloadV0".to_string())?, + )) + } else { + None + }; + + let request = AppRequest::V0(AppRequestV0 { + session_id, + command, + nuri, + payload, + }); + + let response = nextgraph::local_broker::app_request(request) + .await + .map_err(|e: NgError| e.to_string())?; + + Ok(serde_wasm_bindgen::to_value(&response).unwrap()) +} + #[wasm_bindgen] pub async fn file_get_from_private_store( session_id: JsValue, @@ -931,54 +1050,49 @@ pub async fn file_get_from_private_store( let session_id: u64 = serde_wasm_bindgen::from_value::(session_id) .map_err(|_| "Deserialization error of session_id".to_string())?; - let nuri = NuriV0::new_from(&nuri).map_err(|e| e.to_string())?; + let nuri = NuriV0::new_from(&nuri).map_err(|e| format!("nuri: {}", e.to_string()))?; - let mut request = AppRequest::new(AppRequestCommandV0::FileGet, nuri.clone(), None); - request.set_session_id(session_id); + let branch_nuri = NuriV0::new_private_store_target(); - let (reader, cancel) = nextgraph::local_broker::app_request_stream(request) - .await - .map_err(|e: NgError| e.to_string())?; + file_get_(session_id, nuri, branch_nuri, callback).await +} - async fn inner_task( - mut reader: Receiver, - callback: js_sys::Function, - ) -> ResultSend<()> { - while let Some(app_response) = reader.next().await { - let response_js = serde_wasm_bindgen::to_value(&app_response).unwrap(); - let this = JsValue::null(); - match callback.call1(&this, &response_js) { - Ok(jsval) => { - let promise_res: Result = jsval.dyn_into(); - match promise_res { - Ok(promise) => { - let _ = JsFuture::from(promise).await; - } - Err(_) => {} - } - } - Err(e) => { - log_err!( - "JS callback for fetch_file_from_private_store failed with {:?}", - e - ); - } - } - } - Ok(()) - } +#[wasm_bindgen] +pub async fn file_get( + session_id: JsValue, + reference: JsValue, + branch_nuri: String, + callback: &js_sys::Function, +) -> Result { + let session_id: u64 = serde_wasm_bindgen::from_value::(session_id) + .map_err(|_| "Deserialization error of session_id".to_string())?; + let reference: BlockRef = serde_wasm_bindgen::from_value::(reference) + .map_err(|_| "Deserialization error of file reference".to_string())?; - spawn_and_log_error(inner_task(reader, callback.clone())); + let branch_nuri = + NuriV0::new_from(&branch_nuri).map_err(|e| format!("branch_nuri: {}", e.to_string()))?; - let cb = Closure::once(move || { - log_info!("cancelling"); - //sender.close_channel() - cancel(); - }); - //Closure::wrap(Box::new(move |sender| sender.close_channel()) as Box)>); - let ret = cb.as_ref().clone(); - cb.forget(); - Ok(ret) + file_get_( + session_id, + NuriV0::new_from_obj_ref(&reference), + branch_nuri, + callback, + ) + .await +} + +async fn file_get_( + session_id: u64, + mut nuri: NuriV0, + branch_nuri: NuriV0, + callback: &js_sys::Function, +) -> Result { + nuri.copy_target_from(&branch_nuri); + + let mut request = AppRequest::new(AppRequestCommandV0::FileGet, nuri, None); + request.set_session_id(session_id); + + app_request_stream_(request, callback).await } async fn do_upload_done( @@ -1083,13 +1197,12 @@ async fn do_upload_start(session_id: u64, nuri: NuriV0, mimetype: String) -> Res #[wasm_bindgen] pub async fn upload_start( session_id: JsValue, - nuri: JsValue, + nuri: String, mimetype: String, ) -> Result { let session_id: u64 = serde_wasm_bindgen::from_value::(session_id) .map_err(|_| "Deserialization error of session_id".to_string())?; - let nuri: NuriV0 = serde_wasm_bindgen::from_value::(nuri) - .map_err(|_| "Deserialization error of nuri".to_string())?; + let nuri: NuriV0 = NuriV0::new_from(&nuri).map_err(|e| e.to_string())?; let upload_id = do_upload_start(session_id, nuri, mimetype).await?; @@ -1180,7 +1293,7 @@ pub async fn upload_chunk( session_id: JsValue, upload_id: JsValue, chunk: JsValue, - nuri: JsValue, + nuri: String, ) -> Result { //log_debug!("upload_chunk {:?}", js_nuri); let session_id: u64 = serde_wasm_bindgen::from_value::(session_id) @@ -1189,8 +1302,7 @@ pub async fn upload_chunk( .map_err(|_| "Deserialization error of upload_id".to_string())?; let chunk: serde_bytes::ByteBuf = serde_wasm_bindgen::from_value::(chunk) .map_err(|_| "Deserialization error of chunk".to_string())?; - let nuri: NuriV0 = serde_wasm_bindgen::from_value::(nuri) - .map_err(|_| "Deserialization error of nuri".to_string())?; + let nuri: NuriV0 = NuriV0::new_from(&nuri).map_err(|e| e.to_string())?; let response = do_upload_chunk(session_id, upload_id, chunk, nuri).await?; diff --git a/ng-verifier/src/request_processor.rs b/ng-verifier/src/request_processor.rs index 56bbade6..d23eddc7 100644 --- a/ng-verifier/src/request_processor.rs +++ b/ng-verifier/src/request_processor.rs @@ -109,6 +109,7 @@ impl Verifier { spawn_and_log_error(sending_loop(Arc::new(file), tx.clone())); let fnonce = Box::new(move || { + log_debug!("FileGet cancelled"); tx.close_channel(); }); Ok((rx, fnonce)) diff --git a/ngaccount/web/index.html b/ngaccount/web/index.html index 13335716..5c99ff29 100644 --- a/ngaccount/web/index.html +++ b/ngaccount/web/index.html @@ -56,9 +56,38 @@ /> NextGraph + +
+
+ + + + + + +
+ +
+