working demo of blocks pushed from broker to app

Niko 2 years ago
parent a7c97859da
commit fcbec8474d
  1. 2
      .gitignore
  2. 20
      Cargo.lock
  3. 2
      ng-app/package.json
  4. 4
      ng-app/src-tauri/Cargo.toml
  5. 100
      ng-app/src-tauri/src/lib.rs
  6. 26
      ng-app/src/App.svelte
  7. 64
      ng-app/src/api.ts
  8. 76
      ng-app/src/lib/Greet.svelte
  9. 2
      ng-app/src/routes/Test.svelte
  10. 8
      ng-app/src/routes/URI.svelte
  11. 33
      ng-app/src/store.ts
  12. 5
      ng-sdk-js/Cargo.toml
  13. 98
      ng-sdk-js/src/lib.rs
  14. 117
      p2p-net/src/broker.rs
  15. 2
      p2p-net/src/lib.rs
  16. 1435
      p2p-net/src/tests/file.rs
  17. 1
      p2p-net/src/tests/mod.rs
  18. 4
      p2p-repo/Cargo.toml
  19. 9
      p2p-repo/src/block.rs
  20. 63
      p2p-repo/src/object.rs
  21. 11
      p2p-repo/src/store.rs
  22. 13
      p2p-repo/src/types.rs
  23. 3
      p2p-repo/src/utils.rs
  24. BIN
      p2p-repo/tests/test.jpg
  25. 12
      pnpm-lock.yaml

2
.gitignore vendored

@ -6,3 +6,5 @@
/result* /result*
.DS_Store .DS_Store
node_modules node_modules
/p2p-repo/tests/*.ng
.vscode/settings.json

20
Cargo.lock generated

@ -2480,6 +2480,10 @@ checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54"
name = "ng-app" name = "ng-app"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-std",
"debug_print",
"p2p-net",
"p2p-repo",
"serde", "serde",
"serde_json", "serde_json",
"tauri", "tauri",
@ -2495,13 +2499,16 @@ dependencies = [
"futures", "futures",
"getrandom 0.1.16", "getrandom 0.1.16",
"gloo-timers", "gloo-timers",
"js-sys",
"p2p-client-ws", "p2p-client-ws",
"p2p-net", "p2p-net",
"p2p-repo", "p2p-repo",
"pharos", "pharos",
"serde", "serde",
"serde-wasm-bindgen",
"serde_bare", "serde_bare",
"serde_bytes", "serde_bytes",
"serde_json",
"wasm-bindgen", "wasm-bindgen",
"wasm-bindgen-futures", "wasm-bindgen-futures",
"wasm-bindgen-test", "wasm-bindgen-test",
@ -2817,11 +2824,13 @@ dependencies = [
name = "p2p-repo" name = "p2p-repo"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"base64-url",
"blake3", "blake3",
"chacha20", "chacha20",
"debug_print", "debug_print",
"ed25519-dalek", "ed25519-dalek",
"fastbloom-rs", "fastbloom-rs",
"futures",
"hex", "hex",
"rand 0.7.3", "rand 0.7.3",
"serde", "serde",
@ -3675,6 +3684,17 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "serde-wasm-bindgen"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f3b143e2833c57ab9ad3ea280d21fd34e285a42837aeb0ee301f4f41890fa00e"
dependencies = [
"js-sys",
"serde",
"wasm-bindgen",
]
[[package]] [[package]]
name = "serde_bare" name = "serde_bare"
version = "0.5.0" version = "0.5.0"

@ -8,6 +8,7 @@
"webdev": "cross-env NG_APP_WEB=1 TAURI_DEBUG=1 vite", "webdev": "cross-env NG_APP_WEB=1 TAURI_DEBUG=1 vite",
"webbuild": "cross-env NG_APP_WEB=1 vite build", "webbuild": "cross-env NG_APP_WEB=1 vite build",
"filebuild": "cross-env NG_APP_WEB=1 NG_APP_FILE=1 vite build", "filebuild": "cross-env NG_APP_WEB=1 NG_APP_FILE=1 vite build",
"filebuilddebug": "cross-env NG_APP_WEB=1 NG_APP_FILE=1 TAURI_DEBUG=1 vite build -m debug",
"build": "vite build", "build": "vite build",
"preview": "vite preview", "preview": "vite preview",
"check": "svelte-check --tsconfig ./tsconfig.json", "check": "svelte-check --tsconfig ./tsconfig.json",
@ -16,6 +17,7 @@
"dependencies": { "dependencies": {
"@popperjs/core": "^2.11.8", "@popperjs/core": "^2.11.8",
"@tauri-apps/api": "2.0.0-alpha.4", "@tauri-apps/api": "2.0.0-alpha.4",
"async-proxy": "^0.4.1",
"classnames": "^2.3.2", "classnames": "^2.3.2",
"flowbite": "^1.6.5", "flowbite": "^1.6.5",
"flowbite-svelte": "^0.37.1", "flowbite-svelte": "^0.37.1",

@ -17,9 +17,13 @@ crate-type = ["staticlib", "cdylib", "rlib"]
tauri-build = { version = "2.0.0-alpha.5", features = [] } tauri-build = { version = "2.0.0-alpha.5", features = [] }
[dependencies] [dependencies]
debug_print = "1.0.0"
tauri = { version = "2.0.0-alpha.9", features = [] } tauri = { version = "2.0.0-alpha.9", features = [] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
p2p-repo = { path = "../../p2p-repo" }
p2p-net = { path = "../../p2p-net" }
async-std = { version = "1.12.0", features = ["attributes","unstable"] }
[features] [features]
# this feature is used for production builds or when `devPath` points to the filesystem # this feature is used for production builds or when `devPath` points to the filesystem

@ -6,8 +6,12 @@
// at your option. All files in the project carrying such // at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except // notice may not be copied, modified, or distributed except
// according to those terms. // according to those terms.
use async_std::stream::StreamExt;
use tauri::App; use p2p_net::broker::*;
use p2p_net::log;
use p2p_net::utils::{spawn_and_log_error, Receiver, ResultSend};
use p2p_repo::types::*;
use tauri::{App, Manager};
#[cfg(mobile)] #[cfg(mobile)]
mod mobile; mod mobile;
@ -17,11 +21,92 @@ pub use mobile::*;
pub type SetupHook = Box<dyn FnOnce(&mut App) -> Result<(), Box<dyn std::error::Error>> + Send>; pub type SetupHook = Box<dyn FnOnce(&mut App) -> Result<(), Box<dyn std::error::Error>> + Send>;
// Learn more about Tauri commands at https://tauri.app/v1/guides/features/command // Learn more about Tauri commands at https://tauri.app/v1/guides/features/command
#[tauri::command] #[tauri::command(rename_all = "snake_case")]
fn greet(name: &str) -> String { fn greet(name: &str) -> String {
format!("Hello, {}! You've been greeted from Rust!", name) format!("Hello, {}! You've been greeted from Rust!", name)
} }
#[tauri::command(rename_all = "snake_case")]
async fn test() -> Result<(), ()> {
log!("test is {}", BROKER.read().await.test());
Ok(())
}
#[tauri::command(rename_all = "snake_case")]
async fn create_wallet(name: &str) -> Result<String, ()> {
log!("create wallet from rust {}", name);
Ok(format!("create wallet from rust {}", name))
}
#[tauri::command(rename_all = "snake_case")]
async fn doc_sync_branch(nuri: &str, stream_id: &str, app: tauri::AppHandle) -> Result<(), ()> {
log!("doc_sync_branch {} {}", nuri, stream_id);
let mut reader;
{
let mut sender;
let mut broker = BROKER.write().await;
(reader, sender) = broker.doc_sync_branch(nuri.to_string().clone()).await;
broker.tauri_stream_add(stream_id.to_string(), sender);
}
async fn inner_task(
mut reader: Receiver<Commit>,
stream_id: String,
app: tauri::AppHandle,
) -> ResultSend<()> {
while let Some(commit) = reader.next().await {
app.emit_all(&stream_id, commit).unwrap();
}
BROKER.write().await.tauri_stream_cancel(stream_id);
log!("END OF LOOP");
Ok(())
}
spawn_and_log_error(inner_task(reader, stream_id.to_string(), app));
Ok(())
}
#[tauri::command(rename_all = "snake_case")]
async fn cancel_doc_sync_branch(stream_id: &str) -> Result<(), ()> {
log!("cancel stream {}", stream_id);
BROKER
.write()
.await
.tauri_stream_cancel(stream_id.to_string());
Ok(())
}
#[tauri::command(rename_all = "snake_case")]
async fn doc_get_file_from_store_with_object_ref(
nuri: &str,
obj_ref: ObjectRef,
) -> Result<ObjectContent, String> {
log!(
"doc_get_file_from_store_with_object_ref {} {:?}",
nuri,
obj_ref
);
// let ret = ObjectContent::File(File::V0(FileV0 {
// content_type: "text/plain".to_string(),
// metadata: vec![],
// content: vec![45; 20],
// }));
// Ok(ret)
let obj_content = BROKER
.write()
.await
.get_object_from_store_with_object_ref(nuri.to_string(), obj_ref)
.await
.map_err(|e| e.to_string())?;
Ok(obj_content)
}
#[derive(Default)] #[derive(Default)]
pub struct AppBuilder { pub struct AppBuilder {
setup: Option<SetupHook>, setup: Option<SetupHook>,
@ -50,7 +135,14 @@ impl AppBuilder {
} }
Ok(()) Ok(())
}) })
.invoke_handler(tauri::generate_handler![greet]) .invoke_handler(tauri::generate_handler![
test,
greet,
create_wallet,
doc_sync_branch,
cancel_doc_sync_branch,
doc_get_file_from_store_with_object_ref
])
.run(tauri::generate_context!()) .run(tauri::generate_context!())
.expect("error while running tauri application"); .expect("error while running tauri application");
} }

@ -10,28 +10,24 @@
--> -->
<script lang="ts"> <script lang="ts">
// this line is needed to have the SDK working when compiling for a single file bundle (pnpm filebuild)
//import * as api from "ng-sdk-js";
import Router from "svelte-spa-router"; import Router from "svelte-spa-router";
import { onMount, tick } from "svelte";
import Home from "./routes/Home.svelte"; import Home from "./routes/Home.svelte";
import Test from "./routes/Test.svelte"; import Test from "./routes/Test.svelte";
import URI from "./routes/URI.svelte";
import NotFound from "./routes/NotFound.svelte"; import NotFound from "./routes/NotFound.svelte";
import ng from "./api";
if (import.meta.env.NG_APP_WEB) { ng.test();
import("ng-sdk-js").then((ng2) => {
ng2.test();
});
}
const routes = { const routes = new Map();
// Exact path routes.set("/", Home);
"/": Home, routes.set("/test", Test);
routes.set(/^\/ng(.*)/i, URI);
"/test": Test, routes.set("*", NotFound);
// Catch-all
// This is optional, but if present it must be the last
"*": NotFound,
};
</script> </script>
<main> <main>

@ -0,0 +1,64 @@
// Copyright (c) 2022-2023 Niko Bonnieure, Par le Peuple, NextGraph.org developers
// All rights reserved.
// Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except
// according to those terms.
import {createAsyncProxy} from "async-proxy";
import { writable } from "svelte/store";
const mapping = {
"create_wallet": [ "name" ],
"doc_get_file_from_store_with_object_ref": [ "nuri","obj_ref" ],
"test": [ ]
}
let lastStreamId = 0;
const handler = {
async apply(target, path, caller, args) {
if (import.meta.env.NG_APP_WEB) {
let sdk = await import("ng-sdk-js")
return Reflect.apply(sdk[path], caller, args)
} else {
let tauri = await import("@tauri-apps/api/tauri");
if (path[0] === "doc_sync_branch") {
let stream_id = (lastStreamId += 1).toString();
console.log("stream_id",stream_id);
let { listen } = await import("@tauri-apps/api/event");
let nuri = args[0];
let callback = args[1];
let unlisten = await listen(stream_id, (event) => {
callback(event.payload).then(()=> {})
})
await tauri.invoke("doc_sync_branch",{nuri, stream_id});
return () => {
unlisten();
tauri.invoke("cancel_doc_sync_branch", {stream_id});
}
} else if (path[0] === "doc_get_file_from_store_with_object_ref") {
let arg = {};
args.map((el,ix) => arg[mapping[path[0]][ix]]=el)
let res = await tauri.invoke(path[0],arg);
res['File'].V0.content = Uint8Array.from(res['File'].V0.content);
res['File'].V0.metadata = Uint8Array.from(res['File'].V0.metadata);
return res
}
else {
let arg = {};
args.map((el,ix) => arg[mapping[path[0]][ix]]=el)
return tauri.invoke(path[0],arg)
}
}
},
};
const api = createAsyncProxy({}, handler);
export default api;

@ -10,28 +10,58 @@
--> -->
<script lang="ts"> <script lang="ts">
import ng from "../api";
import branch_commits from "../store";
let name = ""; let name = "";
let greetMsg = ""; let greetMsg = "";
let ng; let cancel = () => {};
let url = "";
let commits = branch_commits("ok", false);
if (import.meta.env.NG_APP_WEB) { let img_map = {};
import("ng-sdk-js").then((ng2) => {
ng = { async function get_img(ref) {
greet: async function (n) { if (!ref) return false;
ng2.test(); let cache = img_map[ref];
return "greetings from web " + n; if (cache) {
}, console.log("got it from cache");
}; return cache;
}); }
} else { try {
import("@tauri-apps/api/tauri").then((tauri) => { let file = await ng.doc_get_file_from_store_with_object_ref("ng:", ref);
ng = { greet: (n) => tauri.invoke("greet", { name: n }) }; console.log(file);
}); var blob = new Blob([file["File"].V0.content], {
type: file["File"].V0.content_type,
});
var imageUrl = URL.createObjectURL(blob);
img_map[ref] = imageUrl;
return imageUrl;
} catch (e) {
console.error(e);
return false;
}
} }
async function greet() { async function greet() {
// Learn more about Tauri commands at https://tauri.app/v1/guides/features/command greetMsg = await ng.create_wallet(name);
greetMsg = await ng?.greet(name); // cancel = await ng.doc_sync_branch("ok", async (commit) => {
// console.log(commit);
// try {
// let file = await ng.doc_get_file_from_store_with_object_ref(
// "ng:",
// commit.V0.content.refs[0]
// );
// console.log(file);
// var blob = new Blob([file["File"].V0.content], {
// type: file["File"].V0.content_type,
// });
// var imageUrl = URL.createObjectURL(blob);
// url = imageUrl;
// } catch (e) {
// console.error(e);
// }
// });
//cancel();
} }
</script> </script>
@ -39,6 +69,20 @@
<div class="row"> <div class="row">
<input id="greet-input" placeholder="Enter a name..." bind:value={name} /> <input id="greet-input" placeholder="Enter a name..." bind:value={name} />
<button on:click={greet}> Greet </button> <button on:click={greet}> Greet </button>
<button on:click={cancel}> Cancel </button>
</div> </div>
<p>{greetMsg}</p> <p>{greetMsg}</p>
{#await commits.load()}
<p>Currently loading...</p>
{:then}
{#each $commits as commit}
<p>
{#await get_img(commit.V0.content.refs[0]) then url}
{#if url}
<img src={url} />
{/if}
{/await}
</p>
{/each}
{/await}
</div> </div>

@ -11,11 +11,11 @@
<script lang="ts"> <script lang="ts">
import Greet from "../lib/Greet.svelte"; import Greet from "../lib/Greet.svelte";
export let params = {};
</script> </script>
<main class="container2"> <main class="container2">
<h1>Welcome to test</h1> <h1>Welcome to test</h1>
<div class="row"> <div class="row">
<Greet /> <Greet />
</div> </div>

@ -0,0 +1,8 @@
<script>
// The params prop contains values matched from the URL
export let params = {};
console.log(params);
</script>
<p>nextgraph URI {params[1]}</p>

@ -0,0 +1,33 @@
import { writable } from "svelte/store";
import ng from "./api";
const branch_commits = (nura, sub) => {
const { subscribe, set, update } = writable([]); // create the underlying writable store
let unsub = () => {};
return {
load: async () => {
unsub = await ng.doc_sync_branch(nura, async (commit) => {
console.log(commit);
update( (old) => {old.unshift(commit); return old;} )
});
},
subscribe: (run, invalid) => {
let upper_unsub = subscribe(run, invalid);
return () => {
upper_unsub();
unsub();
}
}
// set: (value) => {
// localStorage.setItem(key, toString(value)); // save also to local storage as a string
// return set(value);
// },
// update,
};
};
export default branch_commits;

@ -28,14 +28,15 @@ serde_bare = "0.5.0"
serde_bytes = "0.11.7" serde_bytes = "0.11.7"
# snow = "0.9.2" # snow = "0.9.2"
getrandom = { version = "0.1.1", features = ["wasm-bindgen"] } getrandom = { version = "0.1.1", features = ["wasm-bindgen"] }
serde_json = "1.0"
# [target.'cfg(target_arch = "wasm32")'.dependencies.getrandom] # [target.'cfg(target_arch = "wasm32")'.dependencies.getrandom]
# version = "0.2.7" # version = "0.2.7"
# features = ["js"] # features = ["js"]
[target.'cfg(target_arch = "wasm32")'.dependencies] [target.'cfg(target_arch = "wasm32")'.dependencies]
# js-sys = "0.3.61" js-sys = "0.3.61"
serde-wasm-bindgen = "0.5"
wasm-bindgen-futures = "0.4.34" wasm-bindgen-futures = "0.4.34"
# web-sys = { version = "0.3.61", features = ["Window"] } # web-sys = { version = "0.3.61", features = ["Window"] }
gloo-timers = "0.2.6" gloo-timers = "0.2.6"

@ -12,19 +12,24 @@
use async_std::task; use async_std::task;
// #[cfg(target_arch = "wasm32")] // #[cfg(target_arch = "wasm32")]
// use js_sys::Reflect; // use js_sys::Reflect;
use async_std::stream::StreamExt;
#[cfg(target_arch = "wasm32")]
use js_sys::Uint8Array;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
use p2p_client_ws::remote_ws_wasm::ConnectionWebSocket; use p2p_client_ws::remote_ws_wasm::ConnectionWebSocket;
use p2p_net::broker::*; use p2p_net::broker::*;
use p2p_net::connection::{ClientConfig, StartConfig}; use p2p_net::connection::{ClientConfig, StartConfig};
use p2p_net::types::{DirectPeerId, IP}; use p2p_net::types::{DirectPeerId, IP};
use p2p_net::utils::{spawn_and_log_error, ResultSend}; use p2p_net::utils::{spawn_and_log_error, Receiver, ResultSend, Sender};
use p2p_net::{log, sleep}; use p2p_net::{log, sleep};
use p2p_repo::types::PubKey; use p2p_repo::types::*;
use p2p_repo::utils::generate_keypair; use p2p_repo::utils::generate_keypair;
use serde_json::json;
use std::net::IpAddr; use std::net::IpAddr;
use std::str::FromStr; use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use wasm_bindgen::prelude::*; use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::{future_to_promise, JsFuture};
#[wasm_bindgen] #[wasm_bindgen]
extern "C" { extern "C" {
@ -43,12 +48,101 @@ extern "C" {
fn random(max: usize) -> usize; fn random(max: usize) -> usize;
} }
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub async fn create_wallet(s: String) -> String {
log!("create wallet {} {}", s, BROKER.read().await.test());
format!("create wallet from js {}", s)
}
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
#[wasm_bindgen] #[wasm_bindgen]
pub async fn test() { pub async fn test() {
log!("test is {}", BROKER.read().await.test()); log!("test is {}", BROKER.read().await.test());
} }
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub async fn doc_get_file_from_store_with_object_ref(
nuri: String,
obj_ref_js: JsValue,
) -> Result<JsValue, JsValue> {
let obj_ref = serde_wasm_bindgen::from_value::<ObjectRef>(obj_ref_js).unwrap();
log!(
"doc_get_file {} {:?} {}",
nuri,
obj_ref.id,
BROKER.read().await.test()
);
// let vec: Vec<u8> = vec![2; 10];
// let view = unsafe { Uint8Array::view(&vec) };
// let x = JsValue::from(Uint8Array::new(view.as_ref()));
// let ret = ObjectContent::File(File::V0(FileV0 {
// content_type: "text/plain".to_string(),
// metadata: vec![],
// content: vec![45; 20],
// }));
let obj_content = BROKER
.write()
.await
.get_object_from_store_with_object_ref(nuri, obj_ref)
.await
.map_err(|e| e.to_string())?;
Ok(serde_wasm_bindgen::to_value(&obj_content).unwrap())
}
#[cfg(target_arch = "wasm32")]
#[wasm_bindgen]
pub async fn doc_sync_branch(anuri: String, callback: &js_sys::Function) -> JsValue {
let vec: Vec<u8> = vec![2; 10];
let view = unsafe { Uint8Array::view(&vec) };
let x = JsValue::from(Uint8Array::new(view.as_ref()));
let mut reader;
let mut sender;
{
(reader, sender) = BROKER.write().await.doc_sync_branch(anuri.clone()).await;
}
async fn inner_task(
mut reader: Receiver<Commit>,
anuri: String,
callback: js_sys::Function,
) -> ResultSend<()> {
while let Some(commit) = reader.next().await {
let xx = serde_wasm_bindgen::to_value(&commit).unwrap();
//let xx = JsValue::from(json!(commit).to_string());
//let _ = callback.call1(&this, &xx);
let this = JsValue::null();
let jsval: JsValue = callback.call1(&this, &xx).unwrap();
let promise_res: Result<js_sys::Promise, JsValue> = jsval.dyn_into();
match promise_res {
Ok(promise) => {
JsFuture::from(promise).await;
}
Err(_) => {}
}
}
log!("END OF LOOP");
Ok(())
}
spawn_and_log_error(inner_task(reader, anuri, callback.clone()));
let cb = Closure::once(move || {
log!("close channel");
sender.close_channel()
});
//Closure::wrap(Box::new(move |sender| sender.close_channel()) as Box<FnMut(Sender<Commit>)>);
let ret = cb.as_ref().clone();
cb.forget();
return ret;
}
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]
#[wasm_bindgen] #[wasm_bindgen]
pub async fn start() { pub async fn start() {

@ -23,12 +23,18 @@ use futures::SinkExt;
use noise_protocol::U8Array; use noise_protocol::U8Array;
use noise_rust_crypto::sensitive::Sensitive; use noise_rust_crypto::sensitive::Sensitive;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use p2p_repo::types::{PrivKey, PubKey}; use p2p_repo::object::Object;
use p2p_repo::object::ObjectParseError;
use p2p_repo::store::HashMapRepoStore;
use p2p_repo::types::*;
use p2p_repo::utils::generate_keypair; use p2p_repo::utils::generate_keypair;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::IpAddr; use std::net::IpAddr;
use std::ops::Deref; use std::ops::Deref;
use std::io::BufReader;
use std::io::Read;
#[derive(Debug)] #[derive(Debug)]
pub enum PeerConnection { pub enum PeerConnection {
Core(IP), Core(IP),
@ -62,9 +68,117 @@ pub struct Broker {
closing: bool, closing: bool,
test: u32, test: u32,
tauri_streams: HashMap<String, Sender<Commit>>,
} }
impl Broker { impl Broker {
/// helper function to store the sender of a tauri stream in order to be able to cancel it later on
/// only used in Tauri, not used in the JS SDK
pub fn tauri_stream_add(&mut self, stream_id: String, sender: Sender<Commit>) {
self.tauri_streams.insert(stream_id, sender);
}
/// helper function to cancel a tauri stream
/// /// only used in Tauri, not used in the JS SDK
pub fn tauri_stream_cancel(&mut self, stream_id: String) {
let s = self.tauri_streams.remove(&stream_id);
if let Some(sender) = s {
sender.close_channel();
}
}
pub async fn get_block_from_store_with_block_id(
&mut self,
nuri: String,
id: BlockId,
include_children: bool,
) -> Result<Receiver<Block>, ProtocolError> {
// TODO
let (mut tx, rx) = mpsc::unbounded::<Block>();
//log!("cur {}", std::env::current_dir().unwrap().display());
//Err(ProtocolError::AccessDenied)
// let f = std::fs::File::open(
// "../p2p-repo/tests/e4e4b57524ce29df826055c368894e912ab03af46f61f6270b4c8796bc6f4221.ng",
// )
// .expect("open of block.ng");
// let mut reader = BufReader::new(f);
// let mut block_buffer: Vec<u8> = Vec::new();
// reader
// .read_to_end(&mut block_buffer)
// .expect("read of test.ng");
let block = serde_bare::from_slice::<Block>(&crate::tests::file::test).unwrap();
tx.send(block).await;
Ok(rx)
}
pub async fn get_object_from_store_with_object_ref(
&mut self,
nuri: String,
obj_ref: ObjectRef,
) -> Result<ObjectContent, ProtocolError> {
let blockstream = self
.get_block_from_store_with_block_id(nuri, obj_ref.id, true)
.await?;
let store = HashMapRepoStore::from_block_stream(blockstream).await;
Object::load(obj_ref.id, Some(obj_ref.key), &store)
.map_err(|e| match e {
ObjectParseError::MissingBlocks(_missing) => ProtocolError::MissingBlocks,
_ => ProtocolError::ObjectParseError,
})?
.content()
.map_err(|_| ProtocolError::ObjectParseError)
}
pub async fn doc_sync_branch(&mut self, anuri: String) -> (Receiver<Commit>, Sender<Commit>) {
let (mut tx, rx) = mpsc::unbounded::<Commit>();
let obj_ref = ObjectRef {
id: ObjectId::Blake3Digest32([
228, 228, 181, 117, 36, 206, 41, 223, 130, 96, 85, 195, 104, 137, 78, 145, 42, 176,
58, 244, 111, 97, 246, 39, 11, 76, 135, 150, 188, 111, 66, 33,
]),
key: SymKey::ChaCha20Key([
100, 243, 39, 242, 203, 131, 102, 50, 9, 54, 248, 113, 4, 160, 28, 45, 73, 56, 217,
112, 95, 150, 144, 137, 9, 57, 106, 5, 39, 202, 146, 94,
]),
};
let refs = vec![obj_ref];
let metadata = vec![5u8; 55];
let expiry = None;
let (member_privkey, member_pubkey) = generate_keypair();
let commit = Commit::new(
member_privkey,
member_pubkey,
1,
obj_ref,
vec![],
vec![],
refs,
metadata,
obj_ref,
expiry,
)
.unwrap();
async fn send(mut tx: Sender<Commit>, commit: Commit) -> ResultSend<()> {
while let Ok(_) = tx.send(commit.clone()).await {
log!("sending");
sleep!(std::time::Duration::from_secs(3));
}
log!("end of sending");
Ok(())
}
spawn_and_log_error(send(tx.clone(), commit));
(rx, tx.clone())
}
pub fn reconnecting(&mut self, peer_id: &DirectPeerId) { pub fn reconnecting(&mut self, peer_id: &DirectPeerId) {
let peerinfo = self.peers.get_mut(peer_id); let peerinfo = self.peers.get_mut(peer_id);
match peerinfo { match peerinfo {
@ -108,6 +222,7 @@ impl Broker {
shutdown_sender, shutdown_sender,
direct_connections: HashMap::new(), direct_connections: HashMap::new(),
peers: HashMap::new(), peers: HashMap::new(),
tauri_streams: HashMap::new(),
closing: false, closing: false,
test: u32::from_be_bytes(random_buf), test: u32::from_be_bytes(random_buf),
} }

@ -25,6 +25,8 @@ pub mod actors;
pub mod utils; pub mod utils;
pub mod tests;
pub static WS_PORT: u16 = 1025; pub static WS_PORT: u16 = 1025;
#[cfg(target_arch = "wasm32")] #[cfg(target_arch = "wasm32")]

File diff suppressed because it is too large Load Diff

@ -0,0 +1 @@
pub mod file;

@ -17,4 +17,6 @@ serde_bare = "0.5.0"
serde_bytes = "0.11.7" serde_bytes = "0.11.7"
fastbloom-rs = "0.5.3" fastbloom-rs = "0.5.3"
debug_print = "1.0.0" debug_print = "1.0.0"
hex = "0.4.3" hex = "0.4.3"
futures = "0.3.24"
base64-url = "2.0.0"

@ -3,7 +3,7 @@
// This code is partly derived from work written by TG x Thoth from P2Pcollab. // This code is partly derived from work written by TG x Thoth from P2Pcollab.
// Copyright 2022 TG x Thoth // Copyright 2022 TG x Thoth
// Licensed under the Apache License, Version 2.0 // Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0> // <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>, // or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such // at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except // notice may not be copied, modified, or distributed except
@ -35,6 +35,13 @@ impl BlockV0 {
} }
} }
impl From<Digest> for String {
fn from(id: BlockId) -> Self {
base64_url::encode(&serde_bare::to_vec(&id).unwrap())
//hex::encode(to_vec(&id).unwrap())
}
}
impl Block { impl Block {
pub fn new( pub fn new(
children: Vec<BlockId>, children: Vec<BlockId>,

@ -3,7 +3,7 @@
// This code is partly derived from work written by TG x Thoth from P2Pcollab. // This code is partly derived from work written by TG x Thoth from P2Pcollab.
// Copyright 2022 TG x Thoth // Copyright 2022 TG x Thoth
// Licensed under the Apache License, Version 2.0 // Licensed under the Apache License, Version 2.0
// <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0> // <LICENSE-APACHE2 or http://www.apache.org/licenses/LICENSE-2.0>
// or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>, // or the MIT license <LICENSE-MIT or http://opensource.org/licenses/MIT>,
// at your option. All files in the project carrying such // at your option. All files in the project carrying such
// notice may not be copied, modified, or distributed except // notice may not be copied, modified, or distributed except
@ -194,6 +194,7 @@ impl Object {
) -> Object { ) -> Object {
// create blocks by chunking + encrypting content // create blocks by chunking + encrypting content
let valid_block_size = store_valid_value_size(block_size); let valid_block_size = store_valid_value_size(block_size);
println!("valid_block_size {}", valid_block_size);
let data_chunk_size = valid_block_size - EMPTY_BLOCK_SIZE - DATA_VARINT_EXTRA; let data_chunk_size = valid_block_size - EMPTY_BLOCK_SIZE - DATA_VARINT_EXTRA;
let mut blocks: Vec<Block> = vec![]; let mut blocks: Vec<Block> = vec![];
@ -301,6 +302,7 @@ impl Object {
for id in parents { for id in parents {
match store.get(&id) { match store.get(&id) {
Ok(block) => { Ok(block) => {
//FIXME: remove the block.clone()
blocks.insert(0, block.clone()); blocks.insert(0, block.clone());
match block { match block {
Block::V0(o) => { Block::V0(o) => {
@ -561,6 +563,9 @@ mod test {
use crate::object::*; use crate::object::*;
use crate::store::*; use crate::store::*;
use crate::types::*; use crate::types::*;
use std::io::BufReader;
use std::io::Read;
use std::io::Write;
// Those constants are calculated with RepoStore::get_max_value_size // Those constants are calculated with RepoStore::get_max_value_size
@ -571,11 +576,61 @@ mod test {
/// Maximum data that can fit in object.content /// Maximum data that can fit in object.content
const MAX_DATA_PAYLOAD_SIZE: usize = 2097112; const MAX_DATA_PAYLOAD_SIZE: usize = 2097112;
/// Test JPEG file
#[test]
pub fn test_jpg() {
let f = std::fs::File::open("tests/test.jpg").expect("open of tests/test.jpg");
let mut reader = BufReader::new(f);
let mut img_buffer: Vec<u8> = Vec::new();
reader
.read_to_end(&mut img_buffer)
.expect("read of test.jpg");
let file = File::V0(FileV0 {
content_type: "image/jpeg".into(),
metadata: vec![],
content: img_buffer,
});
let content = ObjectContent::File(file);
let deps: Vec<ObjectId> = vec![Digest::Blake3Digest32([9; 32])];
let exp = Some(2u32.pow(31));
let max_object_size = store_max_value_size();
let repo_secret = SymKey::ChaCha20Key([0; 32]);
let repo_pubkey = PubKey::Ed25519PubKey([1; 32]);
let obj = Object::new(
content,
vec![],
exp,
max_object_size,
repo_pubkey,
repo_secret,
);
println!("obj.id: {:?}", obj.id());
println!("obj.key: {:?}", obj.key());
println!("obj.blocks.len: {:?}", obj.blocks().len());
let mut i = 0;
for node in obj.blocks() {
println!("#{}: {:?}", i, node.id());
let mut file = std::fs::File::create(format!("tests/{}.ng", node.id()))
.expect("open block write file");
let ser_file = serde_bare::to_vec(node).unwrap();
file.write_all(&ser_file);
println!("{:?}", ser_file);
i += 1;
}
}
/// Test tree API /// Test tree API
#[test] #[test]
pub fn test_object() { pub fn test_object() {
let file = File::V0(FileV0 { let file = File::V0(FileV0 {
content_type: Vec::from("file/test"), content_type: "file/test".into(),
metadata: Vec::from("some meta data here"), metadata: Vec::from("some meta data here"),
content: [(0..255).collect::<Vec<u8>>().as_slice(); 320].concat(), content: [(0..255).collect::<Vec<u8>>().as_slice(); 320].concat(),
}); });
@ -683,7 +738,7 @@ mod test {
let deps: Vec<ObjectId> = vec![Digest::Blake3Digest32([9; 32])]; let deps: Vec<ObjectId> = vec![Digest::Blake3Digest32([9; 32])];
let empty_file = ObjectContent::File(File::V0(FileV0 { let empty_file = ObjectContent::File(File::V0(FileV0 {
content_type: vec![], content_type: "".into(),
metadata: vec![], metadata: vec![],
content: vec![], content: vec![],
})); }));
@ -699,7 +754,7 @@ mod test {
println!("file size: {}", size); println!("file size: {}", size);
let content = ObjectContent::File(File::V0(FileV0 { let content = ObjectContent::File(File::V0(FileV0 {
content_type: vec![], content_type: "".into(),
metadata: vec![], metadata: vec![],
content: vec![99; size], content: vec![99; size],
})); }));

@ -11,7 +11,10 @@
//! Block store //! Block store
use futures::StreamExt;
use crate::types::*; use crate::types::*;
use crate::utils::Receiver;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::{ use std::{
@ -77,6 +80,14 @@ impl HashMapRepoStore {
} }
} }
pub async fn from_block_stream(mut blockstream: Receiver<Block>) -> Self {
let this = Self::new();
while let Some(block) = blockstream.next().await {
this.put(&block).unwrap();
}
this
}
pub fn get_len(&self) -> usize { pub fn get_len(&self) -> usize {
self.blocks.read().unwrap().len() self.blocks.read().unwrap().len()
} }

@ -219,11 +219,11 @@ pub struct Site {
/// BLAKE3 hash of the RepoId /// BLAKE3 hash of the RepoId
pub type RepoHash = Digest; pub type RepoHash = Digest;
impl From<RepoHash> for String { // impl From<RepoHash> for String {
fn from(id: RepoHash) -> Self { // fn from(id: RepoHash) -> Self {
hex::encode(to_vec(&id).unwrap()) // hex::encode(to_vec(&id).unwrap())
} // }
} // }
/// RepoId is a PubKey /// RepoId is a PubKey
pub type RepoId = PubKey; pub type RepoId = PubKey;
@ -594,8 +594,7 @@ pub enum Commit {
/// File Object /// File Object
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct FileV0 { pub struct FileV0 {
#[serde(with = "serde_bytes")] pub content_type: String,
pub content_type: Vec<u8>,
#[serde(with = "serde_bytes")] #[serde(with = "serde_bytes")]
pub metadata: Vec<u8>, pub metadata: Vec<u8>,

@ -13,6 +13,7 @@ use crate::errors::*;
use crate::types::*; use crate::types::*;
use ed25519_dalek::*; use ed25519_dalek::*;
use futures::channel::mpsc;
use rand::rngs::OsRng; use rand::rngs::OsRng;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
@ -121,3 +122,5 @@ pub fn now_timestamp() -> Timestamp {
.try_into() .try_into()
.unwrap() .unwrap()
} }
pub type Receiver<T> = mpsc::UnboundedReceiver<T>;

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

@ -13,6 +13,7 @@ importers:
'@tauri-apps/cli': 2.0.0-alpha.9 '@tauri-apps/cli': 2.0.0-alpha.9
'@tsconfig/svelte': ^3.0.0 '@tsconfig/svelte': ^3.0.0
'@types/node': ^18.7.10 '@types/node': ^18.7.10
async-proxy: ^0.4.1
autoprefixer: ^10.4.14 autoprefixer: ^10.4.14
classnames: ^2.3.2 classnames: ^2.3.2
cross-env: ^7.0.3 cross-env: ^7.0.3
@ -37,6 +38,7 @@ importers:
dependencies: dependencies:
'@popperjs/core': 2.11.8 '@popperjs/core': 2.11.8
'@tauri-apps/api': 2.0.0-alpha.4 '@tauri-apps/api': 2.0.0-alpha.4
async-proxy: 0.4.1
classnames: 2.3.2 classnames: 2.3.2
flowbite: 1.6.5 flowbite: 1.6.5
flowbite-svelte: 0.37.3_svelte@3.59.1 flowbite-svelte: 0.37.3_svelte@3.59.1
@ -611,6 +613,12 @@ packages:
resolution: {integrity: sha512-PYjyFOLKQ9y57JvQ6QLo8dAgNqswh8M1RMJYdQduT6xbWSgK36P/Z/v+p888pM69jMMfS8Xd8F6I1kQ/I9HUGg==} resolution: {integrity: sha512-PYjyFOLKQ9y57JvQ6QLo8dAgNqswh8M1RMJYdQduT6xbWSgK36P/Z/v+p888pM69jMMfS8Xd8F6I1kQ/I9HUGg==}
dev: true dev: true
/async-proxy/0.4.1:
resolution: {integrity: sha512-4e+zNtoGL4+cnqib8v169CnKcRfAsAubp2EsjBhAA5jyW7jjI3t36rVvuqLwmhtliwf8JvSnxinE4ecQN+DK4w==}
dependencies:
object-path-operator: 3.0.0
dev: false
/autoprefixer/10.4.14_postcss@8.4.24: /autoprefixer/10.4.14_postcss@8.4.24:
resolution: {integrity: sha512-FQzyfOsTlwVzjHxKEqRIAdJx9niO6VCBCoEwax/VLSoQF29ggECcPuBqUMZ+u8jCZOPSy8b8/8KnuFbp0SaFZQ==} resolution: {integrity: sha512-FQzyfOsTlwVzjHxKEqRIAdJx9niO6VCBCoEwax/VLSoQF29ggECcPuBqUMZ+u8jCZOPSy8b8/8KnuFbp0SaFZQ==}
engines: {node: ^10 || ^12 || >=14} engines: {node: ^10 || ^12 || >=14}
@ -1256,6 +1264,10 @@ packages:
engines: {node: '>= 6'} engines: {node: '>= 6'}
dev: true dev: true
/object-path-operator/3.0.0:
resolution: {integrity: sha512-Z7dlPUeXqRU/lLfGerP24dPC66n7ehyXaTM81k71EFlsaaEjOHkf4/uq1WGicfGfiO7snYShneE1YZZUkyRiLQ==}
dev: false
/once/1.4.0: /once/1.4.0:
resolution: {integrity: sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==} resolution: {integrity: sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==}
dependencies: dependencies:

Loading…
Cancel
Save