From 0ac81bb1bf3f2652dc818bf3e1a6715e649fb9dc Mon Sep 17 00:00:00 2001 From: Jackson Morgan Date: Sat, 3 May 2025 22:02:14 -0400 Subject: [PATCH] Completed the Subscription code --- packages/connected/package.json | 4 +- .../src/linkTraversal/ResourceLinkQuery.ts | 44 ++++++++++++++++--- .../src/linkTraversal/exploreLinks.ts | 4 +- .../test/LinkTraversalIntegration.test.ts | 20 ++++----- 4 files changed, 53 insertions(+), 19 deletions(-) diff --git a/packages/connected/package.json b/packages/connected/package.json index 8f6b378..4836178 100644 --- a/packages/connected/package.json +++ b/packages/connected/package.json @@ -6,7 +6,7 @@ "scripts": { "build": "tsc --project tsconfig.build.json", "watch": "tsc --watch", - "test": "cross-env NODE_OPTIONS=--experimental-vm-modules jest --coverage -t \"handles subscriptions if data changes\"", + "test": "cross-env NODE_OPTIONS=--experimental-vm-modules jest --coverage", "test:watch": "jest --watch", "prepublishOnly": "npm run test && npm run build", "lint": "eslint src/** --fix --no-error-on-unmatched-pattern", @@ -48,4 +48,4 @@ "publishConfig": { "access": "public" } -} +} \ No newline at end of file diff --git a/packages/connected/src/linkTraversal/ResourceLinkQuery.ts b/packages/connected/src/linkTraversal/ResourceLinkQuery.ts index 0e6f287..79d9a83 100644 --- a/packages/connected/src/linkTraversal/ResourceLinkQuery.ts +++ b/packages/connected/src/linkTraversal/ResourceLinkQuery.ts @@ -22,11 +22,13 @@ export class ResourceLinkQuery< { protected trackedResources: Set = new Set(); - // uri -> unsubscribeId - protected resourceUnsubscribeIds: Record = {}; - protected thisUnsubscribeIds: Set = new Set(); protected previousTransactionId: string = "INIT"; + // Resource Subscriptions uri -> unsubscribeId + protected activeResourceSubscriptions: Record = {}; + // Unsubscribe IDs for this ResourceLinkQuery + protected thisUnsubscribeIds = new Set(); + constructor( protected parentDataset: IConnectedLdoDataset, protected shapeType: ShapeType, @@ -71,6 +73,9 @@ export class ResourceLinkQuery< // Explore the links, with a subscription to re-explore the links if any // covered information changes + const resourcesCurrentlySubscribedTo = new Set( + Object.keys(this.activeResourceSubscriptions), + ); await exploreLinks( this.parentDataset, this.shapeType, @@ -79,15 +84,44 @@ export class ResourceLinkQuery< this.linkQueryInput, { onCoveredDataChanged: onDataChanged, + onResourceEncountered: async (resource) => { + // No need to do anything if we're already subscribed + if (resourcesCurrentlySubscribedTo.has(resource.uri)) { + console.log(`No need to subscirbe to ${resource.uri}`); + resourcesCurrentlySubscribedTo.delete(resource.uri); + return; + } + // Otherwise begin the subscription + console.log(`Subscirbing to ${resource.uri}`); + const unsubscribeId = await resource.subscribeToNotifications(); + this.activeResourceSubscriptions[resource.uri] = unsubscribeId; + }, }, ); + // Clean up unused subscriptions + await Promise.all( + Array.from(resourcesCurrentlySubscribedTo).map(async (uri) => + this.unsubscribeFromResource(uri), + ), + ); }; await onDataChanged({}, "BEGIN_SUB", [null, null, null, null]); return subscriptionId; } + private async unsubscribeFromResource(uri) { + const resource = this.parentDataset.getResource(uri); + const unsubscribeId = this.activeResourceSubscriptions[uri]; + delete this.activeResourceSubscriptions[uri]; + await resource.unsubscribeFromNotifications(unsubscribeId); + } + private async fullUnsubscribe(): Promise { - // TODO + await Promise.all( + Object.keys(this.activeResourceSubscriptions).map(async (uri) => + this.unsubscribeFromResource(uri), + ), + ); } async unsubscribe(unsubscribeId: string): Promise { @@ -104,7 +138,7 @@ export class ResourceLinkQuery< } getSubscribedResources(): Plugins[number]["types"]["resource"][] { - return Object.keys(this.resourceUnsubscribeIds).map((uri) => + return Object.keys(this.activeResourceSubscriptions).map((uri) => this.parentDataset.getResource(uri), ); } diff --git a/packages/connected/src/linkTraversal/exploreLinks.ts b/packages/connected/src/linkTraversal/exploreLinks.ts index 5cc34dd..71969ec 100644 --- a/packages/connected/src/linkTraversal/exploreLinks.ts +++ b/packages/connected/src/linkTraversal/exploreLinks.ts @@ -11,7 +11,7 @@ import type { Quad } from "@rdfjs/types"; interface ExploreLinksOptions { onResourceEncountered?: ( resource: Plugins[number]["types"]["resource"], - ) => void; + ) => Promise; onCoveredDataChanged?: nodeEventListener; shouldRefreshResources?: boolean; } @@ -34,7 +34,7 @@ export async function exploreLinks< if (readResult.isError) return; if (options?.onResourceEncountered) - options?.onResourceEncountered(startingResource); + await options?.onResourceEncountered(startingResource); const proxyBuilder = options?.onCoveredDataChanged ? createTrackingProxyBuilder( diff --git a/packages/connected/test/LinkTraversalIntegration.test.ts b/packages/connected/test/LinkTraversalIntegration.test.ts index 060dfd7..89c3164 100644 --- a/packages/connected/test/LinkTraversalIntegration.test.ts +++ b/packages/connected/test/LinkTraversalIntegration.test.ts @@ -105,7 +105,7 @@ describe("Link Traversal", () => { expect(knowNames).toContain("Third User"); }); - it("handles subscriptions if data changes on the Pod", async () => { + it.only("handles subscriptions if data changes on the Pod", async () => { const mainProfileResource = solidLdoDataset.getResource(MAIN_PROFILE_URI); await solidLdoDataset .usingType(SolidProfileShapeShapeType) @@ -133,15 +133,15 @@ describe("Link Traversal", () => { console.log("=================="); - // Update to include a new document - const cMainProfile = changeData(mainProfile, mainProfileResource); - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore - cMainProfile.knows?.add({ "@id": THIRD_PROFILE_SUBJECT }); - await commitData(cMainProfile); - - // Wait for 200ms to allow the other file to be fetched - await wait(200); + // Update data on the Pod + await s.authFetch(MAIN_PROFILE_URI, { + method: "PATCH", + body: "INSERT DATA { . }", + headers: { + "Content-Type": "application/sparql-update", + }, + }); + await wait(1000); // After the data is committed, the third profile should be present mainProfile = solidLdoDataset