fix streamed API orm_start

feat/orm-diffs
Niko PLP 18 hours ago
parent df05d2e69c
commit 7bbbbe8df2
  1. 4
      engine/broker/auth/src/App.svelte
  2. 4
      sdk/js/signals/src/connector/ormConnectionHandler.ts
  3. 30
      sdk/js/web/src/index.ts

@ -296,8 +296,8 @@
const method = e.data.method;
const args = e.data.args;
const port = e.data.port;
// TODO: add other stream RPC methods
if ( method === "doc_subscribe" ) {
// TODO: add other streamed RPC methods
if ( method === "doc_subscribe" || method === "orm_start" ) {
//console.log("processing streamed request ...",method, args);
args.push((callbacked)=> {
port.postMessage({stream:true, ret:callbacked});

@ -78,12 +78,16 @@ export class OrmConnection<T extends BaseType> {
// Establish connection to wasm land.
await new Promise((resolve) => setTimeout(resolve, 100));
try {
ng.orm_start(
scope,
shapeType,
this.sessionId,
this.onBackendMessage
);
} catch (e) {
console.error(e)
}
});
}

@ -38,25 +38,41 @@ export const init = async function(callback:Function | null, singleton:boolean,
}
}
const streamed_api: Record<string,number> = {
"doc_subscribe": 2,
"orm_start": 3
};
function rpc( method:string, args?: any) : Promise<any> {
const { port1, port2 } = new MessageChannel();
//console.log("POSTING",method, args);
if (method==="doc_subscribe") { //TODO: add all the streamed functions
let callback = args[2];
let new_args = [args[0],args[1]];
let callback_idx = streamed_api[method];
if (callback_idx) { //TODO: add all the streamed functions
let callback = args[callback_idx];
let new_args = args.slice(0, -1);
parent.postMessage({ method, args:new_args, port: port2 }, config.origin, [port2]);
let unsub = new Promise(async (resolve)=> {
let unsub = new Promise(async (resolve, reject)=> {
let resolved = false;
port1.onmessage = async (m) => {
if (m.data.stream) {
if (!resolved) {
resolve(()=>{
port1.close();
});
});
port1.onmessage = async (m) => {
if (m.data.stream) {
resolved = true;
}
await (callback)(m.data.ret);
} else if (!m.data.ok) {
if (!resolved) {
reject(new Error(m.data.ret));
resolved= true;
} else {
throw new Error(m.data.ret);
}
}
};
});
//port2.onclose = ()
return unsub;

Loading…
Cancel
Save