From 77b030260fad1c4758dfa494b4b77f713e9c1338 Mon Sep 17 00:00:00 2001 From: Jackson Morgan Date: Sun, 4 May 2025 18:19:15 -0400 Subject: [PATCH] Completed link query on connected-solid --- .../src/linkTraversal/ResourceLinkQuery.ts | 77 ++++++++++++++----- .../test/LinkTraversalIntegration.test.ts | 26 ++++++- 2 files changed, 78 insertions(+), 25 deletions(-) diff --git a/packages/connected/src/linkTraversal/ResourceLinkQuery.ts b/packages/connected/src/linkTraversal/ResourceLinkQuery.ts index 514763d..3e96b23 100644 --- a/packages/connected/src/linkTraversal/ResourceLinkQuery.ts +++ b/packages/connected/src/linkTraversal/ResourceLinkQuery.ts @@ -29,6 +29,11 @@ export class ResourceLinkQuery< protected curOnDataChanged: nodeEventListener | undefined; + protected resourcesWithSubscriptionInProgress: Record< + string, + Promise | undefined + > = {}; + constructor( protected parentDataset: IConnectedLdoDataset, protected shapeType: ShapeType, @@ -79,37 +84,66 @@ export class ResourceLinkQuery< // Explore the links, with a subscription to re-explore the links if any // covered information changes - const resourcesCurrentlySubscribedTo = new Set( + const resourcesToUnsubscribeFrom = new Set( Object.keys(this.activeResourceSubscriptions), ); + + // Only add the listeners if we're currently subscribed + const exploreOptions = this.curOnDataChanged + ? { + onCoveredDataChanged: this.curOnDataChanged, + onResourceEncountered: async (resource) => { + console.log(`RESOURCE ENCOUNTERED! ${resource.uri}`); + // Wait for the the in progress registration to complete. Once it + // is complete, you're subscribed, so we can remove this from the + // resources to unsubscribe from. + if (this.resourcesWithSubscriptionInProgress[resource.uri]) { + console.log( + "Waiting on the subscription to finish.", + resource.uri, + ); + await this.resourcesWithSubscriptionInProgress[resource.uri]; + resourcesToUnsubscribeFrom.delete(resource.uri); + return; + } + // No need to do anything if we're already subscribed + if (resourcesToUnsubscribeFrom.has(resource.uri)) { + console.log(`No need to subscirbe to ${resource.uri}`); + resourcesToUnsubscribeFrom.delete(resource.uri); + return; + } + // Otherwise begin the subscription + console.log(`Subscirbing to ${resource.uri}`); + let resolve; + this.resourcesWithSubscriptionInProgress[resource.uri] = + new Promise((res) => { + resolve = res; + }); + const unsubscribeId = await resource.subscribeToNotifications(); + console.log(`Add to active subscriptions ${resource.uri}`); + this.activeResourceSubscriptions[resource.uri] = unsubscribeId; + // Unsubscribe in case unsubscribe call came in mid subscription + if (!this.curOnDataChanged) { + await this.unsubscribeFromResource(resource.uri); + } + resolve(); + this.resourcesWithSubscriptionInProgress[resource.uri] = + undefined; + }, + } + : {}; await exploreLinks( this.parentDataset, this.shapeType, this.startingResource, this.startingSubject, this.linkQueryInput, - { - onCoveredDataChanged: this.curOnDataChanged, - onResourceEncountered: async (resource) => { - console.log(`RESOURCE ENCOUNTERED! ${resource.uri}`); - // No need to do anything if we're already subscribed - if (resourcesCurrentlySubscribedTo.has(resource.uri)) { - console.log(`No need to subscirbe to ${resource.uri}`); - resourcesCurrentlySubscribedTo.delete(resource.uri); - return; - } - // Otherwise begin the subscription - console.log(`Subscirbing to ${resource.uri}`); - const unsubscribeId = await resource.subscribeToNotifications(); - console.log(`Add to active subscriptions ${resource.uri}`); - this.activeResourceSubscriptions[resource.uri] = unsubscribeId; - }, - }, + exploreOptions, ); // Clean up unused subscriptions - console.log("Cleaning these up", resourcesCurrentlySubscribedTo); + console.log("Cleaning these up", resourcesToUnsubscribeFrom); await Promise.all( - Array.from(resourcesCurrentlySubscribedTo).map(async (uri) => + Array.from(resourcesToUnsubscribeFrom).map(async (uri) => this.unsubscribeFromResource(uri), ), ); @@ -119,6 +153,7 @@ export class ResourceLinkQuery< } private async unsubscribeFromResource(uri) { + console.log(`Unsubscribing from ${uri}`); const resource = this.parentDataset.getResource(uri); const unsubscribeId = this.activeResourceSubscriptions[uri]; delete this.activeResourceSubscriptions[uri]; @@ -126,7 +161,7 @@ export class ResourceLinkQuery< } private async fullUnsubscribe(): Promise { - console.log("Unsubscribing"); + console.log("Full Unsubscribing"); if (this.curOnDataChanged) { this.parentDataset.removeListenerFromAllEvents(this.curOnDataChanged); this.curOnDataChanged = undefined; diff --git a/packages/connected/test/LinkTraversalIntegration.test.ts b/packages/connected/test/LinkTraversalIntegration.test.ts index 18c630c..b03b31c 100644 --- a/packages/connected/test/LinkTraversalIntegration.test.ts +++ b/packages/connected/test/LinkTraversalIntegration.test.ts @@ -15,7 +15,6 @@ import { } from "./LinkTraversalData"; import { SolidProfileShapeShapeType } from "./.ldo/solidProfile.shapeTypes"; import { wait } from "./util/wait"; -import { inspect } from "util"; describe("Link Traversal", () => { // eslint-disable-next-line @typescript-eslint/ban-ts-comment @@ -54,15 +53,15 @@ describe("Link Traversal", () => { it("handles subscriptions if data changes locally", async () => { const mainProfileResource = solidLdoDataset.getResource(MAIN_PROFILE_URI); - await solidLdoDataset + const linkQuery = solidLdoDataset .usingType(SolidProfileShapeShapeType) .startLinkQuery(mainProfileResource, MAIN_PROFILE_SUBJECT, { name: true, knows: { name: true, }, - }) - .subscribe(); + }); + await linkQuery.subscribe(); // Should have regular information let mainProfile = solidLdoDataset @@ -104,6 +103,8 @@ describe("Link Traversal", () => { const knowNames = mainProfile.knows?.map((knowsPerson) => knowsPerson.name); expect(knowNames).toContain("Other User"); expect(knowNames).toContain("Third User"); + + // Unsubscribe }); it.only("handles subscriptions if data changes on the Pod", async () => { @@ -136,6 +137,7 @@ describe("Link Traversal", () => { let subscribedResources = linkQuery .getSubscribedResources() .map((resource) => resource.uri); + console.log("Subscribed to resources 1", subscribedResources); expect(subscribedResources.length).toBe(2); expect(subscribedResources).toContain(MAIN_PROFILE_URI); expect(subscribedResources).toContain(OTHER_PROFILE_URI); @@ -201,5 +203,21 @@ describe("Link Traversal", () => { .map((resource) => resource.uri); console.log("Subscribed Resources", subscribedResources); expect(subscribedResources.length).toBe(0); + + console.log("TIME FOR SOME ADDITIONAL TESTS ============================="); + + // Check that all resources are unsubscribed from notifications + const resources = solidLdoDataset.getResources(); + resources.forEach((resource) => { + expect(resource.isSubscribedToNotifications()).toBe(false); + }); + + const cMainProfile = changeData(mainProfile, mainProfileResource); + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore + cMainProfile.knows?.add({ + "@id": "http://localhost:3005/test-container/fifthProfile.ttl#me", + }); + await commitData(cMainProfile); }); });