diff --git a/engine/broker/auth/src/App.svelte b/engine/broker/auth/src/App.svelte index 449f53e..7036a9a 100644 --- a/engine/broker/auth/src/App.svelte +++ b/engine/broker/auth/src/App.svelte @@ -296,8 +296,8 @@ const method = e.data.method; const args = e.data.args; const port = e.data.port; - // TODO: add other stream RPC methods - if ( method === "doc_subscribe" ) { + // TODO: add other streamed RPC methods + if ( method === "doc_subscribe" || method === "orm_start" ) { //console.log("processing streamed request ...",method, args); args.push((callbacked)=> { port.postMessage({stream:true, ret:callbacked}); diff --git a/sdk/js/signals/src/connector/ormConnectionHandler.ts b/sdk/js/signals/src/connector/ormConnectionHandler.ts index 2ecdb90..e928170 100644 --- a/sdk/js/signals/src/connector/ormConnectionHandler.ts +++ b/sdk/js/signals/src/connector/ormConnectionHandler.ts @@ -78,12 +78,16 @@ export class OrmConnection { // Establish connection to wasm land. await new Promise((resolve) => setTimeout(resolve, 100)); - ng.orm_start( - scope, - shapeType, - this.sessionId, - this.onBackendMessage - ); + try { + ng.orm_start( + scope, + shapeType, + this.sessionId, + this.onBackendMessage + ); + } catch (e) { + console.error(e) + } }); } diff --git a/sdk/js/web/src/index.ts b/sdk/js/web/src/index.ts index d18aa35..f9f04c6 100644 --- a/sdk/js/web/src/index.ts +++ b/sdk/js/web/src/index.ts @@ -38,25 +38,41 @@ export const init = async function(callback:Function | null, singleton:boolean, } } +const streamed_api: Record = { + "doc_subscribe": 2, + "orm_start": 3 +}; + function rpc( method:string, args?: any) : Promise { const { port1, port2 } = new MessageChannel(); //console.log("POSTING",method, args); - if (method==="doc_subscribe") { //TODO: add all the streamed functions - let callback = args[2]; - let new_args = [args[0],args[1]]; + let callback_idx = streamed_api[method]; + if (callback_idx) { //TODO: add all the streamed functions + let callback = args[callback_idx]; + let new_args = args.slice(0, -1); parent.postMessage({ method, args:new_args, port: port2 }, config.origin, [port2]); - let unsub = new Promise(async (resolve)=> { - resolve(()=>{ - port1.close(); - }); + let unsub = new Promise(async (resolve, reject)=> { + let resolved = false; + port1.onmessage = async (m) => { + if (m.data.stream) { + if (!resolved) { + resolve(()=>{ + port1.close(); + }); + resolved = true; + } + await (callback)(m.data.ret); + } else if (!m.data.ok) { + if (!resolved) { + reject(new Error(m.data.ret)); + resolved= true; + } else { + throw new Error(m.data.ret); + } + } + }; }); - port1.onmessage = async (m) => { - if (m.data.stream) { - await (callback)(m.data.ret); - } else if (!m.data.ok) { - throw new Error(m.data.ret); - } - }; + //port2.onclose = () return unsub;