Completed the Subscription code

main
Jackson Morgan 4 months ago
parent 46be390649
commit 0ac81bb1bf
  1. 4
      packages/connected/package.json
  2. 44
      packages/connected/src/linkTraversal/ResourceLinkQuery.ts
  3. 4
      packages/connected/src/linkTraversal/exploreLinks.ts
  4. 20
      packages/connected/test/LinkTraversalIntegration.test.ts

@ -6,7 +6,7 @@
"scripts": { "scripts": {
"build": "tsc --project tsconfig.build.json", "build": "tsc --project tsconfig.build.json",
"watch": "tsc --watch", "watch": "tsc --watch",
"test": "cross-env NODE_OPTIONS=--experimental-vm-modules jest --coverage -t \"handles subscriptions if data changes\"", "test": "cross-env NODE_OPTIONS=--experimental-vm-modules jest --coverage",
"test:watch": "jest --watch", "test:watch": "jest --watch",
"prepublishOnly": "npm run test && npm run build", "prepublishOnly": "npm run test && npm run build",
"lint": "eslint src/** --fix --no-error-on-unmatched-pattern", "lint": "eslint src/** --fix --no-error-on-unmatched-pattern",
@ -48,4 +48,4 @@
"publishConfig": { "publishConfig": {
"access": "public" "access": "public"
} }
} }

@ -22,11 +22,13 @@ export class ResourceLinkQuery<
{ {
protected trackedResources: Set<Plugins[number]["types"]["resource"]> = protected trackedResources: Set<Plugins[number]["types"]["resource"]> =
new Set(); new Set();
// uri -> unsubscribeId
protected resourceUnsubscribeIds: Record<string, string> = {};
protected thisUnsubscribeIds: Set<string> = new Set();
protected previousTransactionId: string = "INIT"; protected previousTransactionId: string = "INIT";
// Resource Subscriptions uri -> unsubscribeId
protected activeResourceSubscriptions: Record<string, string> = {};
// Unsubscribe IDs for this ResourceLinkQuery
protected thisUnsubscribeIds = new Set<string>();
constructor( constructor(
protected parentDataset: IConnectedLdoDataset<Plugins>, protected parentDataset: IConnectedLdoDataset<Plugins>,
protected shapeType: ShapeType<Type>, protected shapeType: ShapeType<Type>,
@ -71,6 +73,9 @@ export class ResourceLinkQuery<
// Explore the links, with a subscription to re-explore the links if any // Explore the links, with a subscription to re-explore the links if any
// covered information changes // covered information changes
const resourcesCurrentlySubscribedTo = new Set(
Object.keys(this.activeResourceSubscriptions),
);
await exploreLinks( await exploreLinks(
this.parentDataset, this.parentDataset,
this.shapeType, this.shapeType,
@ -79,15 +84,44 @@ export class ResourceLinkQuery<
this.linkQueryInput, this.linkQueryInput,
{ {
onCoveredDataChanged: onDataChanged, onCoveredDataChanged: onDataChanged,
onResourceEncountered: async (resource) => {
// No need to do anything if we're already subscribed
if (resourcesCurrentlySubscribedTo.has(resource.uri)) {
console.log(`No need to subscirbe to ${resource.uri}`);
resourcesCurrentlySubscribedTo.delete(resource.uri);
return;
}
// Otherwise begin the subscription
console.log(`Subscirbing to ${resource.uri}`);
const unsubscribeId = await resource.subscribeToNotifications();
this.activeResourceSubscriptions[resource.uri] = unsubscribeId;
},
}, },
); );
// Clean up unused subscriptions
await Promise.all(
Array.from(resourcesCurrentlySubscribedTo).map(async (uri) =>
this.unsubscribeFromResource(uri),
),
);
}; };
await onDataChanged({}, "BEGIN_SUB", [null, null, null, null]); await onDataChanged({}, "BEGIN_SUB", [null, null, null, null]);
return subscriptionId; return subscriptionId;
} }
private async unsubscribeFromResource(uri) {
const resource = this.parentDataset.getResource(uri);
const unsubscribeId = this.activeResourceSubscriptions[uri];
delete this.activeResourceSubscriptions[uri];
await resource.unsubscribeFromNotifications(unsubscribeId);
}
private async fullUnsubscribe(): Promise<void> { private async fullUnsubscribe(): Promise<void> {
// TODO await Promise.all(
Object.keys(this.activeResourceSubscriptions).map(async (uri) =>
this.unsubscribeFromResource(uri),
),
);
} }
async unsubscribe(unsubscribeId: string): Promise<void> { async unsubscribe(unsubscribeId: string): Promise<void> {
@ -104,7 +138,7 @@ export class ResourceLinkQuery<
} }
getSubscribedResources(): Plugins[number]["types"]["resource"][] { getSubscribedResources(): Plugins[number]["types"]["resource"][] {
return Object.keys(this.resourceUnsubscribeIds).map((uri) => return Object.keys(this.activeResourceSubscriptions).map((uri) =>
this.parentDataset.getResource(uri), this.parentDataset.getResource(uri),
); );
} }

@ -11,7 +11,7 @@ import type { Quad } from "@rdfjs/types";
interface ExploreLinksOptions<Plugins extends ConnectedPlugin[]> { interface ExploreLinksOptions<Plugins extends ConnectedPlugin[]> {
onResourceEncountered?: ( onResourceEncountered?: (
resource: Plugins[number]["types"]["resource"], resource: Plugins[number]["types"]["resource"],
) => void; ) => Promise<void>;
onCoveredDataChanged?: nodeEventListener<Quad>; onCoveredDataChanged?: nodeEventListener<Quad>;
shouldRefreshResources?: boolean; shouldRefreshResources?: boolean;
} }
@ -34,7 +34,7 @@ export async function exploreLinks<
if (readResult.isError) return; if (readResult.isError) return;
if (options?.onResourceEncountered) if (options?.onResourceEncountered)
options?.onResourceEncountered(startingResource); await options?.onResourceEncountered(startingResource);
const proxyBuilder = options?.onCoveredDataChanged const proxyBuilder = options?.onCoveredDataChanged
? createTrackingProxyBuilder( ? createTrackingProxyBuilder(

@ -105,7 +105,7 @@ describe("Link Traversal", () => {
expect(knowNames).toContain("Third User"); expect(knowNames).toContain("Third User");
}); });
it("handles subscriptions if data changes on the Pod", async () => { it.only("handles subscriptions if data changes on the Pod", async () => {
const mainProfileResource = solidLdoDataset.getResource(MAIN_PROFILE_URI); const mainProfileResource = solidLdoDataset.getResource(MAIN_PROFILE_URI);
await solidLdoDataset await solidLdoDataset
.usingType(SolidProfileShapeShapeType) .usingType(SolidProfileShapeShapeType)
@ -133,15 +133,15 @@ describe("Link Traversal", () => {
console.log("=================="); console.log("==================");
// Update to include a new document // Update data on the Pod
const cMainProfile = changeData(mainProfile, mainProfileResource); await s.authFetch(MAIN_PROFILE_URI, {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment method: "PATCH",
// @ts-ignore body: "INSERT DATA { <http://localhost:3005/test-container/mainProfile.ttl#me> <http://xmlns.com/foaf/0.1/knows> <http://localhost:3005/test-container/thirdProfile.ttl#me> . }",
cMainProfile.knows?.add({ "@id": THIRD_PROFILE_SUBJECT }); headers: {
await commitData(cMainProfile); "Content-Type": "application/sparql-update",
},
// Wait for 200ms to allow the other file to be fetched });
await wait(200); await wait(1000);
// After the data is committed, the third profile should be present // After the data is committed, the third profile should be present
mainProfile = solidLdoDataset mainProfile = solidLdoDataset

Loading…
Cancel
Save