Completed link query on connected-solid

main
Jackson Morgan 4 months ago
parent 7ed9ba5795
commit 77b030260f
  1. 77
      packages/connected/src/linkTraversal/ResourceLinkQuery.ts
  2. 26
      packages/connected/test/LinkTraversalIntegration.test.ts

@ -29,6 +29,11 @@ export class ResourceLinkQuery<
protected curOnDataChanged: nodeEventListener<Quad> | undefined;
protected resourcesWithSubscriptionInProgress: Record<
string,
Promise<void> | undefined
> = {};
constructor(
protected parentDataset: IConnectedLdoDataset<Plugins>,
protected shapeType: ShapeType<Type>,
@ -79,37 +84,66 @@ export class ResourceLinkQuery<
// Explore the links, with a subscription to re-explore the links if any
// covered information changes
const resourcesCurrentlySubscribedTo = new Set(
const resourcesToUnsubscribeFrom = new Set(
Object.keys(this.activeResourceSubscriptions),
);
// Only add the listeners if we're currently subscribed
const exploreOptions = this.curOnDataChanged
? {
onCoveredDataChanged: this.curOnDataChanged,
onResourceEncountered: async (resource) => {
console.log(`RESOURCE ENCOUNTERED! ${resource.uri}`);
// Wait for the the in progress registration to complete. Once it
// is complete, you're subscribed, so we can remove this from the
// resources to unsubscribe from.
if (this.resourcesWithSubscriptionInProgress[resource.uri]) {
console.log(
"Waiting on the subscription to finish.",
resource.uri,
);
await this.resourcesWithSubscriptionInProgress[resource.uri];
resourcesToUnsubscribeFrom.delete(resource.uri);
return;
}
// No need to do anything if we're already subscribed
if (resourcesToUnsubscribeFrom.has(resource.uri)) {
console.log(`No need to subscirbe to ${resource.uri}`);
resourcesToUnsubscribeFrom.delete(resource.uri);
return;
}
// Otherwise begin the subscription
console.log(`Subscirbing to ${resource.uri}`);
let resolve;
this.resourcesWithSubscriptionInProgress[resource.uri] =
new Promise<void>((res) => {
resolve = res;
});
const unsubscribeId = await resource.subscribeToNotifications();
console.log(`Add to active subscriptions ${resource.uri}`);
this.activeResourceSubscriptions[resource.uri] = unsubscribeId;
// Unsubscribe in case unsubscribe call came in mid subscription
if (!this.curOnDataChanged) {
await this.unsubscribeFromResource(resource.uri);
}
resolve();
this.resourcesWithSubscriptionInProgress[resource.uri] =
undefined;
},
}
: {};
await exploreLinks(
this.parentDataset,
this.shapeType,
this.startingResource,
this.startingSubject,
this.linkQueryInput,
{
onCoveredDataChanged: this.curOnDataChanged,
onResourceEncountered: async (resource) => {
console.log(`RESOURCE ENCOUNTERED! ${resource.uri}`);
// No need to do anything if we're already subscribed
if (resourcesCurrentlySubscribedTo.has(resource.uri)) {
console.log(`No need to subscirbe to ${resource.uri}`);
resourcesCurrentlySubscribedTo.delete(resource.uri);
return;
}
// Otherwise begin the subscription
console.log(`Subscirbing to ${resource.uri}`);
const unsubscribeId = await resource.subscribeToNotifications();
console.log(`Add to active subscriptions ${resource.uri}`);
this.activeResourceSubscriptions[resource.uri] = unsubscribeId;
},
},
exploreOptions,
);
// Clean up unused subscriptions
console.log("Cleaning these up", resourcesCurrentlySubscribedTo);
console.log("Cleaning these up", resourcesToUnsubscribeFrom);
await Promise.all(
Array.from(resourcesCurrentlySubscribedTo).map(async (uri) =>
Array.from(resourcesToUnsubscribeFrom).map(async (uri) =>
this.unsubscribeFromResource(uri),
),
);
@ -119,6 +153,7 @@ export class ResourceLinkQuery<
}
private async unsubscribeFromResource(uri) {
console.log(`Unsubscribing from ${uri}`);
const resource = this.parentDataset.getResource(uri);
const unsubscribeId = this.activeResourceSubscriptions[uri];
delete this.activeResourceSubscriptions[uri];
@ -126,7 +161,7 @@ export class ResourceLinkQuery<
}
private async fullUnsubscribe(): Promise<void> {
console.log("Unsubscribing");
console.log("Full Unsubscribing");
if (this.curOnDataChanged) {
this.parentDataset.removeListenerFromAllEvents(this.curOnDataChanged);
this.curOnDataChanged = undefined;

@ -15,7 +15,6 @@ import {
} from "./LinkTraversalData";
import { SolidProfileShapeShapeType } from "./.ldo/solidProfile.shapeTypes";
import { wait } from "./util/wait";
import { inspect } from "util";
describe("Link Traversal", () => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
@ -54,15 +53,15 @@ describe("Link Traversal", () => {
it("handles subscriptions if data changes locally", async () => {
const mainProfileResource = solidLdoDataset.getResource(MAIN_PROFILE_URI);
await solidLdoDataset
const linkQuery = solidLdoDataset
.usingType(SolidProfileShapeShapeType)
.startLinkQuery(mainProfileResource, MAIN_PROFILE_SUBJECT, {
name: true,
knows: {
name: true,
},
})
.subscribe();
});
await linkQuery.subscribe();
// Should have regular information
let mainProfile = solidLdoDataset
@ -104,6 +103,8 @@ describe("Link Traversal", () => {
const knowNames = mainProfile.knows?.map((knowsPerson) => knowsPerson.name);
expect(knowNames).toContain("Other User");
expect(knowNames).toContain("Third User");
// Unsubscribe
});
it.only("handles subscriptions if data changes on the Pod", async () => {
@ -136,6 +137,7 @@ describe("Link Traversal", () => {
let subscribedResources = linkQuery
.getSubscribedResources()
.map((resource) => resource.uri);
console.log("Subscribed to resources 1", subscribedResources);
expect(subscribedResources.length).toBe(2);
expect(subscribedResources).toContain(MAIN_PROFILE_URI);
expect(subscribedResources).toContain(OTHER_PROFILE_URI);
@ -201,5 +203,21 @@ describe("Link Traversal", () => {
.map((resource) => resource.uri);
console.log("Subscribed Resources", subscribedResources);
expect(subscribedResources.length).toBe(0);
console.log("TIME FOR SOME ADDITIONAL TESTS =============================");
// Check that all resources are unsubscribed from notifications
const resources = solidLdoDataset.getResources();
resources.forEach((resource) => {
expect(resource.isSubscribedToNotifications()).toBe(false);
});
const cMainProfile = changeData(mainProfile, mainProfileResource);
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
cMainProfile.knows?.add({
"@id": "http://localhost:3005/test-container/fifthProfile.ttl#me",
});
await commitData(cMainProfile);
});
});

Loading…
Cancel
Save