From 46be390649c9d80e7ca1437c8831ae8a7aba55c5 Mon Sep 17 00:00:00 2001 From: Jackson Morgan Date: Sat, 3 May 2025 18:43:35 -0400 Subject: [PATCH] Subscriptions work locally --- package-lock.json | 41 ++++++++++++- packages/connected/package.json | 5 +- .../src/linkTraversal/ResourceLinkQuery.ts | 46 +++++++++++--- .../src/linkTraversal/exploreLinks.ts | 25 +++++--- packages/connected/test/LinkTraversalData.ts | 2 +- .../test/LinkTraversalIntegration.test.ts | 60 ++++++++++++++++++- packages/connected/test/util/wait.ts | 3 + packages/subscribable-dataset/package.json | 4 +- .../src/SubscribableDataset.ts | 3 + packages/subscribable-dataset/src/types.ts | 1 + .../test/SubscribableDataset.test.ts | 10 ++-- 11 files changed, 170 insertions(+), 30 deletions(-) create mode 100644 packages/connected/test/util/wait.ts diff --git a/package-lock.json b/package-lock.json index 308c357..919233d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -25939,7 +25939,8 @@ "ts-node": "^10.9.1", "typed-emitter": "^2.1.0", "typedoc": "^0.25.4", - "typedoc-plugin-markdown": "^3.17.1" + "typedoc-plugin-markdown": "^3.17.1", + "uuid": "^11.1.0" } }, "packages/connected-nextgraph": { @@ -26003,6 +26004,20 @@ "typedoc-plugin-markdown": "^3.17.1" } }, + "packages/connected/node_modules/uuid": { + "version": "11.1.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.0.tgz", + "integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==", + "dev": true, + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "license": "MIT", + "bin": { + "uuid": "dist/esm/bin/uuid" + } + }, "packages/dataset": { "name": "@ldo/dataset", "version": "1.0.0-alpha.3", @@ -26287,7 +26302,8 @@ "license": "MIT", "dependencies": { "@ldo/dataset": "^1.0.0-alpha.3", - "@ldo/rdf-utils": "^1.0.0-alpha.3" + "@ldo/rdf-utils": "^1.0.0-alpha.3", + "uuid": "^11.1.0" }, "devDependencies": { "@rdfjs/data-model": "^1.2.0", @@ -26295,9 +26311,17 @@ "@rdfjs/types": "^1.0.1", "@types/jsonld": "^1.5.6", "@types/rdfjs__dataset": "^1.0.4", + "@types/uuid": "^10.0.0", "ts-node": "^9.1.1" } }, + "packages/subscribable-dataset/node_modules/@types/uuid": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-10.0.0.tgz", + "integrity": "sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==", + "dev": true, + "license": "MIT" + }, "packages/subscribable-dataset/node_modules/source-map-support": { "version": "0.5.21", "dev": true, @@ -26332,6 +26356,19 @@ "typescript": ">=2.7" } }, + "packages/subscribable-dataset/node_modules/uuid": { + "version": "11.1.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.0.tgz", + "integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "license": "MIT", + "bin": { + "uuid": "dist/esm/bin/uuid" + } + }, "packages/test-solid-server": { "name": "@ldo/test-solid-server", "version": "1.0.0-alpha.8", diff --git a/packages/connected/package.json b/packages/connected/package.json index ee2bb0f..8f6b378 100644 --- a/packages/connected/package.json +++ b/packages/connected/package.json @@ -33,7 +33,8 @@ "ts-node": "^10.9.1", "typed-emitter": "^2.1.0", "typedoc": "^0.25.4", - "typedoc-plugin-markdown": "^3.17.1" + "typedoc-plugin-markdown": "^3.17.1", + "uuid": "^11.1.0" }, "dependencies": { "@ldo/dataset": "^1.0.0-alpha.3", @@ -47,4 +48,4 @@ "publishConfig": { "access": "public" } -} \ No newline at end of file +} diff --git a/packages/connected/src/linkTraversal/ResourceLinkQuery.ts b/packages/connected/src/linkTraversal/ResourceLinkQuery.ts index 5d7424e..0e6f287 100644 --- a/packages/connected/src/linkTraversal/ResourceLinkQuery.ts +++ b/packages/connected/src/linkTraversal/ResourceLinkQuery.ts @@ -10,6 +10,9 @@ import type { SubjectNode } from "@ldo/rdf-utils"; import { exploreLinks } from "./exploreLinks"; import type { IConnectedLdoDataset } from "../types/IConnectedLdoDataset"; import type { IConnectedLdoBuilder } from "../types/IConnectedLdoBuilder"; +import { v4 } from "uuid"; +import type { nodeEventListener } from "@ldo/subscribable-dataset"; +import type { Quad } from "@rdfjs/types"; export class ResourceLinkQuery< Type extends LdoBase, @@ -22,6 +25,7 @@ export class ResourceLinkQuery< // uri -> unsubscribeId protected resourceUnsubscribeIds: Record = {}; protected thisUnsubscribeIds: Set = new Set(); + protected previousTransactionId: string = "INIT"; constructor( protected parentDataset: IConnectedLdoDataset, @@ -47,15 +51,39 @@ export class ResourceLinkQuery< } async subscribe(): Promise { - await exploreLinks( - this.parentDataset, - this.shapeType, - this.startingResource, - this.startingSubject, - this.linkQueryInput, - {}, - ); - return "string"; + const subscriptionId = v4(); + const onDataChanged: nodeEventListener = async ( + _changes, + transactionId: string, + _triggering, + ) => { + console.log( + `Transaction ID: ${transactionId}\ntriggering: [${_triggering[0] + ?.value}, ${_triggering[1]?.value}, ${_triggering[2] + ?.value}, ${_triggering[3] + ?.value}]\nadded: ${_changes.added?.toString()}\nremoved:${_changes.removed?.toString()}`, + ); + // Set a transaction Id, so that we only trigger one re-render + if (transactionId === this.previousTransactionId) return; + this.previousTransactionId = transactionId; + // Remove previous registration + this.parentDataset.removeListenerFromAllEvents(onDataChanged); + + // Explore the links, with a subscription to re-explore the links if any + // covered information changes + await exploreLinks( + this.parentDataset, + this.shapeType, + this.startingResource, + this.startingSubject, + this.linkQueryInput, + { + onCoveredDataChanged: onDataChanged, + }, + ); + }; + await onDataChanged({}, "BEGIN_SUB", [null, null, null, null]); + return subscriptionId; } private async fullUnsubscribe(): Promise { diff --git a/packages/connected/src/linkTraversal/exploreLinks.ts b/packages/connected/src/linkTraversal/exploreLinks.ts index 29c23c6..5cc34dd 100644 --- a/packages/connected/src/linkTraversal/exploreLinks.ts +++ b/packages/connected/src/linkTraversal/exploreLinks.ts @@ -5,11 +5,14 @@ import type { LQInput } from "../types/ILinkQuery"; import { BasicLdSet } from "@ldo/jsonld-dataset-proxy"; import type { IConnectedLdoDataset } from "../types/IConnectedLdoDataset"; import { createTrackingProxyBuilder } from "../trackingProxy/createTrackingProxy"; +import type { nodeEventListener } from "@ldo/subscribable-dataset"; +import type { Quad } from "@rdfjs/types"; interface ExploreLinksOptions { onResourceEncountered?: ( resource: Plugins[number]["types"]["resource"], ) => void; + onCoveredDataChanged?: nodeEventListener; shouldRefreshResources?: boolean; } @@ -30,15 +33,17 @@ export async function exploreLinks< : await startingResource.readIfUnfetched(); if (readResult.isError) return; - const trackingProxyBuilder = createTrackingProxyBuilder( - dataset, - shapeType, - (changes) => - console.log( - `Got Update \nadded: ${changes.added?.toString()}\nremoved: ${changes.removed?.toString()}`, - ), - ); - const ldObject = trackingProxyBuilder.fromSubject(startingSubject); + if (options?.onResourceEncountered) + options?.onResourceEncountered(startingResource); + + const proxyBuilder = options?.onCoveredDataChanged + ? createTrackingProxyBuilder( + dataset, + shapeType, + options?.onCoveredDataChanged, + ) + : dataset.usingType(shapeType); + const ldObject = proxyBuilder.fromSubject(startingSubject); const fetchedDuringThisExploration = new Set([startingResource.uri]); @@ -77,6 +82,8 @@ export async function exploreLinksRecursive< if (readResult.isError) { return; } + if (options?.onResourceEncountered) + options.onResourceEncountered(resourceToFetch); fetchedDuringThisExploration.add(resourceToFetch.uri); } // Recurse through the other elemenets diff --git a/packages/connected/test/LinkTraversalData.ts b/packages/connected/test/LinkTraversalData.ts index ab8010a..d81879b 100644 --- a/packages/connected/test/LinkTraversalData.ts +++ b/packages/connected/test/LinkTraversalData.ts @@ -5,7 +5,7 @@ export const MAIN_PROFILE_URI = `${BASE_CONTAINER}mainProfile.ttl`; export const MAIN_PROFILE_SUBJECT = `${MAIN_PROFILE_URI}#me`; export const OTHER_PROFILE_URI = `${BASE_CONTAINER}otherProfile.ttl`; export const OTHER_PROFILE_SUBJECT = `${OTHER_PROFILE_URI}#me`; -export const THIRD_PROFILE_URI = `${BASE_CONTAINER}otherProfile.ttl`; +export const THIRD_PROFILE_URI = `${BASE_CONTAINER}thirdProfile.ttl`; export const THIRD_PROFILE_SUBJECT = `${THIRD_PROFILE_URI}#me`; export const linkTraversalData: ResourceInfo = { diff --git a/packages/connected/test/LinkTraversalIntegration.test.ts b/packages/connected/test/LinkTraversalIntegration.test.ts index ef54db9..060dfd7 100644 --- a/packages/connected/test/LinkTraversalIntegration.test.ts +++ b/packages/connected/test/LinkTraversalIntegration.test.ts @@ -14,6 +14,7 @@ import { THIRD_PROFILE_URI, } from "./LinkTraversalData"; import { SolidProfileShapeShapeType } from "./.ldo/solidProfile.shapeTypes"; +import { wait } from "./util/wait"; describe("Link Traversal", () => { // eslint-disable-next-line @typescript-eslint/ban-ts-comment @@ -50,7 +51,61 @@ describe("Link Traversal", () => { expect(data.knows?.toArray()[0].name).toBe("Other User"); }); - it("handles subscriptions if data changes", async () => { + it("handles subscriptions if data changes locally", async () => { + const mainProfileResource = solidLdoDataset.getResource(MAIN_PROFILE_URI); + await solidLdoDataset + .usingType(SolidProfileShapeShapeType) + .startLinkQuery(mainProfileResource, MAIN_PROFILE_SUBJECT, { + name: true, + knows: { + name: true, + }, + }) + .subscribe(); + + // Should have regular information + let mainProfile = solidLdoDataset + .usingType(SolidProfileShapeShapeType) + .fromSubject(MAIN_PROFILE_SUBJECT); + let resourceUris = solidLdoDataset + .getResources() + .map((resource) => resource.uri); + expect(resourceUris.length).toBe(3); + expect(resourceUris).toContain(MAIN_PROFILE_URI); + expect(resourceUris).toContain(OTHER_PROFILE_URI); + expect(mainProfile.name).toBe("Main User"); + expect(mainProfile.knows?.size).toBe(1); + expect(mainProfile.knows?.toArray()[0].name).toBe("Other User"); + + // Update to include a new document + const cMainProfile = changeData(mainProfile, mainProfileResource); + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + cMainProfile.knows?.add({ "@id": THIRD_PROFILE_SUBJECT }); + await commitData(cMainProfile); + + // Wait for 200ms to allow the other file to be fetched + await wait(200); + + // After the data is committed, the third profile should be present + mainProfile = solidLdoDataset + .usingType(SolidProfileShapeShapeType) + .fromSubject(MAIN_PROFILE_SUBJECT); + resourceUris = solidLdoDataset + .getResources() + .map((resource) => resource.uri); + expect(resourceUris.length).toBe(4); + expect(resourceUris).toContain(MAIN_PROFILE_URI); + expect(resourceUris).toContain(OTHER_PROFILE_URI); + expect(resourceUris).toContain(THIRD_PROFILE_URI); + expect(mainProfile.name).toBe("Main User"); + expect(mainProfile.knows?.size).toBe(2); + const knowNames = mainProfile.knows?.map((knowsPerson) => knowsPerson.name); + expect(knowNames).toContain("Other User"); + expect(knowNames).toContain("Third User"); + }); + + it("handles subscriptions if data changes on the Pod", async () => { const mainProfileResource = solidLdoDataset.getResource(MAIN_PROFILE_URI); await solidLdoDataset .usingType(SolidProfileShapeShapeType) @@ -85,6 +140,9 @@ describe("Link Traversal", () => { cMainProfile.knows?.add({ "@id": THIRD_PROFILE_SUBJECT }); await commitData(cMainProfile); + // Wait for 200ms to allow the other file to be fetched + await wait(200); + // After the data is committed, the third profile should be present mainProfile = solidLdoDataset .usingType(SolidProfileShapeShapeType) diff --git a/packages/connected/test/util/wait.ts b/packages/connected/test/util/wait.ts new file mode 100644 index 0000000..be2bee6 --- /dev/null +++ b/packages/connected/test/util/wait.ts @@ -0,0 +1,3 @@ +export async function wait(time: number) { + return new Promise((resolve) => setTimeout(resolve, time)); +} diff --git a/packages/subscribable-dataset/package.json b/packages/subscribable-dataset/package.json index d5ed917..43874a6 100644 --- a/packages/subscribable-dataset/package.json +++ b/packages/subscribable-dataset/package.json @@ -27,11 +27,13 @@ "@rdfjs/types": "^1.0.1", "@types/jsonld": "^1.5.6", "@types/rdfjs__dataset": "^1.0.4", + "@types/uuid": "^10.0.0", "ts-node": "^9.1.1" }, "dependencies": { "@ldo/dataset": "^1.0.0-alpha.3", - "@ldo/rdf-utils": "^1.0.0-alpha.3" + "@ldo/rdf-utils": "^1.0.0-alpha.3", + "uuid": "^11.1.0" }, "files": [ "dist", diff --git a/packages/subscribable-dataset/src/SubscribableDataset.ts b/packages/subscribable-dataset/src/SubscribableDataset.ts index f33a543..bb39b51 100644 --- a/packages/subscribable-dataset/src/SubscribableDataset.ts +++ b/packages/subscribable-dataset/src/SubscribableDataset.ts @@ -16,6 +16,7 @@ import type { ITransactionDatasetFactory, } from "./types"; import { ExtendedDataset } from "@ldo/dataset"; +import { v4 } from "uuid"; /** * A wrapper for a dataset that allows subscriptions to be made on nodes to @@ -239,12 +240,14 @@ export class SubscribableDataset populateMatchingDatasetChanges("added"); populateMatchingDatasetChanges("removed"); + const transactionId = v4(); // Alert all listeners Object.entries(matchingDatasetChanges).forEach( ([quadMatchString, info]) => { this.eventEmitter.emit( quadMatchString, info.changes, + transactionId, info.triggerQuadMatch, ); }, diff --git a/packages/subscribable-dataset/src/types.ts b/packages/subscribable-dataset/src/types.ts index 1ed563f..ce3d64d 100644 --- a/packages/subscribable-dataset/src/types.ts +++ b/packages/subscribable-dataset/src/types.ts @@ -6,6 +6,7 @@ import type { Dataset, BaseQuad, DatasetFactory } from "@rdfjs/types"; */ export type nodeEventListener = ( changes: DatasetChanges, + transactionId: string, triggeringQuadMatch: QuadMatch, ) => void; diff --git a/packages/subscribable-dataset/test/SubscribableDataset.test.ts b/packages/subscribable-dataset/test/SubscribableDataset.test.ts index 8429efb..5454c18 100644 --- a/packages/subscribable-dataset/test/SubscribableDataset.test.ts +++ b/packages/subscribable-dataset/test/SubscribableDataset.test.ts @@ -62,7 +62,7 @@ describe("SubscribableDataset", () => { expect(callbackFunc).toBeCalledTimes(1); expect(callbackFunc.mock.calls[0][0].added.size).toBe(1); expect(callbackFunc.mock.calls[0][0].added.has(tomColorQuad)).toBe(true); - expect(callbackFunc.mock.calls[0][1]).toEqual([ + expect(callbackFunc.mock.calls[0][2]).toEqual([ namedNode("http://example.org/cartoons#Tom"), null, null, @@ -80,7 +80,7 @@ describe("SubscribableDataset", () => { expect(callbackFunc).toBeCalledTimes(1); expect(callbackFunc.mock.calls[0][0].removed.size).toBe(1); expect(callbackFunc.mock.calls[0][0].removed.has(tomTypeQuad)).toBe(true); - expect(callbackFunc.mock.calls[0][1]).toEqual([ + expect(callbackFunc.mock.calls[0][2]).toEqual([ namedNode("http://example.org/cartoons#Tom"), null, null, @@ -99,7 +99,7 @@ describe("SubscribableDataset", () => { expect(callbackFunc.mock.calls[0][0].added.size).toBe(2); expect(callbackFunc.mock.calls[0][0].added.has(lickyNameQuad)).toBe(true); expect(callbackFunc.mock.calls[0][0].added.has(lickyTypeQuad)).toBe(true); - expect(callbackFunc.mock.calls[0][1]).toEqual([ + expect(callbackFunc.mock.calls[0][2]).toEqual([ namedNode("http://example.org/cartoons#Licky"), null, null, @@ -123,7 +123,7 @@ describe("SubscribableDataset", () => { ), ).toBe(true); expect(callbackFuncLicky.mock.calls[0][0].removed).toBe(undefined); - expect(callbackFuncLicky.mock.calls[0][1]).toEqual([ + expect(callbackFuncLicky.mock.calls[0][2]).toEqual([ namedNode("http://example.org/cartoons#Licky"), null, null, @@ -147,7 +147,7 @@ describe("SubscribableDataset", () => { ), ).toBe(true); expect(callbackFuncTom.mock.calls[0][0].added).toBe(undefined); - expect(callbackFuncTom.mock.calls[0][1]).toEqual([ + expect(callbackFuncTom.mock.calls[0][2]).toEqual([ namedNode("http://example.org/cartoons#Tom"), null, null,