Tests work, but there's a memory leak

main
Jackson Morgan 4 months ago
parent 0ac81bb1bf
commit 7ed9ba5795
  1. 15
      package-lock.json
  2. 2
      packages/connected/jest.config.js
  3. 1
      packages/connected/package.json
  4. 4
      packages/connected/src/ConnectedLdoBuilder.ts
  5. 32
      packages/connected/src/linkTraversal/ResourceLinkQuery.ts
  6. 26
      packages/connected/src/linkTraversal/exploreLinks.ts
  7. 48
      packages/connected/test/LinkTraversalIntegration.test.ts

15
package-lock.json generated

@ -24051,9 +24051,9 @@
"license": "MIT" "license": "MIT"
}, },
"node_modules/ts-jest": { "node_modules/ts-jest": {
"version": "29.3.0", "version": "29.3.2",
"resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.3.0.tgz", "resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.3.2.tgz",
"integrity": "sha512-4bfGBX7Gd1Aqz3SyeDS9O276wEU/BInZxskPrbhZLyv+c1wskDCqDFMJQJLWrIr/fKoAH4GE5dKUlrdyvo+39A==", "integrity": "sha512-bJJkrWc6PjFVz5g2DGCNUo8z7oFEYaz1xP1NpeDU7KNLMWPpEyV8Chbpkn8xjzgRDpQhnGMyvyldoL7h8JXyug==",
"dev": true, "dev": true,
"license": "MIT", "license": "MIT",
"dependencies": { "dependencies": {
@ -24065,7 +24065,7 @@
"lodash.memoize": "^4.1.2", "lodash.memoize": "^4.1.2",
"make-error": "^1.3.6", "make-error": "^1.3.6",
"semver": "^7.7.1", "semver": "^7.7.1",
"type-fest": "^4.37.0", "type-fest": "^4.39.1",
"yargs-parser": "^21.1.1" "yargs-parser": "^21.1.1"
}, },
"bin": { "bin": {
@ -24114,9 +24114,9 @@
} }
}, },
"node_modules/ts-jest/node_modules/type-fest": { "node_modules/ts-jest/node_modules/type-fest": {
"version": "4.38.0", "version": "4.40.1",
"resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.38.0.tgz", "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.40.1.tgz",
"integrity": "sha512-2dBz5D5ycHIoliLYLi0Q2V7KRaDlH0uWIvmk7TYlAg5slqwiPv1ezJdZm1QEM0xgk29oYWMCbIG7E6gHpvChlg==", "integrity": "sha512-9YvLNnORDpI+vghLU/Nf+zSv0kL47KbVJ1o3sKgoTefl6i+zebxbiDQWoe/oWWqPhIgQdRZRT1KA9sCPL810SA==",
"dev": true, "dev": true,
"license": "(MIT OR CC0-1.0)", "license": "(MIT OR CC0-1.0)",
"engines": { "engines": {
@ -25936,6 +25936,7 @@
"@rdfjs/types": "^1.0.1", "@rdfjs/types": "^1.0.1",
"cross-env": "^7.0.3", "cross-env": "^7.0.3",
"jest-rdf": "^1.8.0", "jest-rdf": "^1.8.0",
"ts-jest": "^29.3.2",
"ts-node": "^10.9.1", "ts-node": "^10.9.1",
"typed-emitter": "^2.1.0", "typed-emitter": "^2.1.0",
"typedoc": "^0.25.4", "typedoc": "^0.25.4",

@ -5,6 +5,6 @@ module.exports = {
rootDir: "./", rootDir: "./",
transform: { transform: {
"^.+\\.(ts|tsx)?$": "ts-jest", "^.+\\.(ts|tsx)?$": "ts-jest",
"^.+\\.(js|jsx)$": "babel-jest",
}, },
coveragePathIgnorePatterns: ["/node_modules/", "/dist/"],
}; };

@ -30,6 +30,7 @@
"@rdfjs/types": "^1.0.1", "@rdfjs/types": "^1.0.1",
"cross-env": "^7.0.3", "cross-env": "^7.0.3",
"jest-rdf": "^1.8.0", "jest-rdf": "^1.8.0",
"ts-jest": "^29.3.2",
"ts-node": "^10.9.1", "ts-node": "^10.9.1",
"typed-emitter": "^2.1.0", "typed-emitter": "^2.1.0",
"typedoc": "^0.25.4", "typedoc": "^0.25.4",

@ -3,7 +3,7 @@ import { LdoBuilder } from "@ldo/ldo";
import type { IConnectedLdoBuilder } from "./types/IConnectedLdoBuilder"; import type { IConnectedLdoBuilder } from "./types/IConnectedLdoBuilder";
import type { JsonldDatasetProxyBuilder } from "@ldo/jsonld-dataset-proxy"; import type { JsonldDatasetProxyBuilder } from "@ldo/jsonld-dataset-proxy";
import type { SubjectNode } from "@ldo/rdf-utils"; import type { SubjectNode } from "@ldo/rdf-utils";
import type { LQInput, ILinkQuery } from "./types/ILinkQuery"; import type { LQInput } from "./types/ILinkQuery";
import { ResourceLinkQuery } from "./linkTraversal/ResourceLinkQuery"; import { ResourceLinkQuery } from "./linkTraversal/ResourceLinkQuery";
import type { ConnectedPlugin } from "./types/ConnectedPlugin"; import type { ConnectedPlugin } from "./types/ConnectedPlugin";
import type { IConnectedLdoDataset } from "./types/IConnectedLdoDataset"; import type { IConnectedLdoDataset } from "./types/IConnectedLdoDataset";
@ -30,7 +30,7 @@ export class ConnectedLdoBuilder<
startingResource: Plugins[number]["types"]["resource"], startingResource: Plugins[number]["types"]["resource"],
startingSubject: SubjectNode | string, startingSubject: SubjectNode | string,
linkQueryInput: Input, linkQueryInput: Input,
): ILinkQuery<Type, Input> { ): ResourceLinkQuery<Type, Input, Plugins> {
return new ResourceLinkQuery( return new ResourceLinkQuery(
this.parentDataset, this.parentDataset,
this.shapeType, this.shapeType,

@ -20,8 +20,6 @@ export class ResourceLinkQuery<
Plugins extends ConnectedPlugin[], Plugins extends ConnectedPlugin[],
> implements ILinkQuery<Type, QueryInput> > implements ILinkQuery<Type, QueryInput>
{ {
protected trackedResources: Set<Plugins[number]["types"]["resource"]> =
new Set();
protected previousTransactionId: string = "INIT"; protected previousTransactionId: string = "INIT";
// Resource Subscriptions uri -> unsubscribeId // Resource Subscriptions uri -> unsubscribeId
@ -29,6 +27,8 @@ export class ResourceLinkQuery<
// Unsubscribe IDs for this ResourceLinkQuery // Unsubscribe IDs for this ResourceLinkQuery
protected thisUnsubscribeIds = new Set<string>(); protected thisUnsubscribeIds = new Set<string>();
protected curOnDataChanged: nodeEventListener<Quad> | undefined;
constructor( constructor(
protected parentDataset: IConnectedLdoDataset<Plugins>, protected parentDataset: IConnectedLdoDataset<Plugins>,
protected shapeType: ShapeType<Type>, protected shapeType: ShapeType<Type>,
@ -54,7 +54,13 @@ export class ResourceLinkQuery<
async subscribe(): Promise<string> { async subscribe(): Promise<string> {
const subscriptionId = v4(); const subscriptionId = v4();
const onDataChanged: nodeEventListener<Quad> = async ( this.thisUnsubscribeIds.add(subscriptionId);
// If there's already a registered onDataChange, we don't need to make a new
// on for this new subscription
if (this.curOnDataChanged) {
return subscriptionId;
}
this.curOnDataChanged = async (
_changes, _changes,
transactionId: string, transactionId: string,
_triggering, _triggering,
@ -69,7 +75,7 @@ export class ResourceLinkQuery<
if (transactionId === this.previousTransactionId) return; if (transactionId === this.previousTransactionId) return;
this.previousTransactionId = transactionId; this.previousTransactionId = transactionId;
// Remove previous registration // Remove previous registration
this.parentDataset.removeListenerFromAllEvents(onDataChanged); this.parentDataset.removeListenerFromAllEvents(this.curOnDataChanged!);
// Explore the links, with a subscription to re-explore the links if any // Explore the links, with a subscription to re-explore the links if any
// covered information changes // covered information changes
@ -83,8 +89,9 @@ export class ResourceLinkQuery<
this.startingSubject, this.startingSubject,
this.linkQueryInput, this.linkQueryInput,
{ {
onCoveredDataChanged: onDataChanged, onCoveredDataChanged: this.curOnDataChanged,
onResourceEncountered: async (resource) => { onResourceEncountered: async (resource) => {
console.log(`RESOURCE ENCOUNTERED! ${resource.uri}`);
// No need to do anything if we're already subscribed // No need to do anything if we're already subscribed
if (resourcesCurrentlySubscribedTo.has(resource.uri)) { if (resourcesCurrentlySubscribedTo.has(resource.uri)) {
console.log(`No need to subscirbe to ${resource.uri}`); console.log(`No need to subscirbe to ${resource.uri}`);
@ -94,18 +101,20 @@ export class ResourceLinkQuery<
// Otherwise begin the subscription // Otherwise begin the subscription
console.log(`Subscirbing to ${resource.uri}`); console.log(`Subscirbing to ${resource.uri}`);
const unsubscribeId = await resource.subscribeToNotifications(); const unsubscribeId = await resource.subscribeToNotifications();
console.log(`Add to active subscriptions ${resource.uri}`);
this.activeResourceSubscriptions[resource.uri] = unsubscribeId; this.activeResourceSubscriptions[resource.uri] = unsubscribeId;
}, },
}, },
); );
// Clean up unused subscriptions // Clean up unused subscriptions
console.log("Cleaning these up", resourcesCurrentlySubscribedTo);
await Promise.all( await Promise.all(
Array.from(resourcesCurrentlySubscribedTo).map(async (uri) => Array.from(resourcesCurrentlySubscribedTo).map(async (uri) =>
this.unsubscribeFromResource(uri), this.unsubscribeFromResource(uri),
), ),
); );
}; };
await onDataChanged({}, "BEGIN_SUB", [null, null, null, null]); await this.curOnDataChanged({}, "BEGIN_SUB", [null, null, null, null]);
return subscriptionId; return subscriptionId;
} }
@ -117,10 +126,15 @@ export class ResourceLinkQuery<
} }
private async fullUnsubscribe(): Promise<void> { private async fullUnsubscribe(): Promise<void> {
console.log("Unsubscribing");
if (this.curOnDataChanged) {
this.parentDataset.removeListenerFromAllEvents(this.curOnDataChanged);
this.curOnDataChanged = undefined;
}
await Promise.all( await Promise.all(
Object.keys(this.activeResourceSubscriptions).map(async (uri) => Object.keys(this.activeResourceSubscriptions).map(async (uri) => {
this.unsubscribeFromResource(uri), this.unsubscribeFromResource(uri);
), }),
); );
} }

@ -45,14 +45,16 @@ export async function exploreLinks<
: dataset.usingType(shapeType); : dataset.usingType(shapeType);
const ldObject = proxyBuilder.fromSubject(startingSubject); const ldObject = proxyBuilder.fromSubject(startingSubject);
const fetchedDuringThisExploration = new Set<string>([startingResource.uri]); const encounteredDuringThisExploration = new Set<string>([
startingResource.uri,
]);
// Recursively explore the rest // Recursively explore the rest
await exploreLinksRecursive( await exploreLinksRecursive(
dataset, dataset,
ldObject, ldObject,
queryInput, queryInput,
fetchedDuringThisExploration, encounteredDuringThisExploration,
options, options,
); );
} }
@ -64,17 +66,17 @@ export async function exploreLinksRecursive<
dataset: IConnectedLdoDataset<Plugins>, dataset: IConnectedLdoDataset<Plugins>,
ldObject: Type, ldObject: Type,
queryInput: LQInput<Type>, queryInput: LQInput<Type>,
fetchedDuringThisExploration: Set<string>, encounteredDuringThisExploration: Set<string>,
options?: ExploreLinksOptions<Plugins>, options?: ExploreLinksOptions<Plugins>,
): Promise<void> { ): Promise<void> {
const shouldFetch = shouldFetchResource( const shouldFetch = shouldFetchResource(
dataset, dataset,
ldObject, ldObject,
queryInput, queryInput,
fetchedDuringThisExploration, encounteredDuringThisExploration,
); );
const resourceToFetch = dataset.getResource(ldObject["@id"]);
if (shouldFetch) { if (shouldFetch) {
const resourceToFetch = dataset.getResource(ldObject["@id"]);
const readResult = options?.shouldRefreshResources const readResult = options?.shouldRefreshResources
? await resourceToFetch.read() ? await resourceToFetch.read()
: await resourceToFetch.readIfUnfetched(); : await resourceToFetch.readIfUnfetched();
@ -82,9 +84,11 @@ export async function exploreLinksRecursive<
if (readResult.isError) { if (readResult.isError) {
return; return;
} }
}
if (!encounteredDuringThisExploration.has(resourceToFetch.uri)) {
encounteredDuringThisExploration.add(resourceToFetch.uri);
if (options?.onResourceEncountered) if (options?.onResourceEncountered)
options.onResourceEncountered(resourceToFetch); await options.onResourceEncountered(resourceToFetch);
fetchedDuringThisExploration.add(resourceToFetch.uri);
} }
// Recurse through the other elemenets // Recurse through the other elemenets
await Promise.all( await Promise.all(
@ -101,7 +105,7 @@ export async function exploreLinksRecursive<
dataset, dataset,
item, item,
queryValue, queryValue,
fetchedDuringThisExploration, encounteredDuringThisExploration,
options, options,
); );
}), }),
@ -111,7 +115,7 @@ export async function exploreLinksRecursive<
dataset, dataset,
ldObject[queryKey], ldObject[queryKey],
queryValue, queryValue,
fetchedDuringThisExploration, encounteredDuringThisExploration,
options, options,
); );
} }
@ -130,14 +134,14 @@ export function shouldFetchResource<
dataset: IConnectedLdoDataset<Plugins>, dataset: IConnectedLdoDataset<Plugins>,
ldObject: Type, ldObject: Type,
queryInput: LQInput<Type>, queryInput: LQInput<Type>,
fetchedDuringThisExploration: Set<string>, encounteredDuringThisExploration: Set<string>,
): boolean { ): boolean {
const linkedResourceUri: string | undefined = ldObject["@id"]; const linkedResourceUri: string | undefined = ldObject["@id"];
// If it's a blank node, no need to fetch // If it's a blank node, no need to fetch
if (!linkedResourceUri) return false; if (!linkedResourceUri) return false;
const linkedResource = dataset.getResource(linkedResourceUri); const linkedResource = dataset.getResource(linkedResourceUri);
// If we've already explored the resource in this exporation, do not fetch // If we've already explored the resource in this exporation, do not fetch
if (fetchedDuringThisExploration.has(linkedResource.uri)) return false; if (encounteredDuringThisExploration.has(linkedResource.uri)) return false;
return Object.entries(queryInput).some(([queryKey, queryValue]) => { return Object.entries(queryInput).some(([queryKey, queryValue]) => {
// If value is undefined then no need to fetch // If value is undefined then no need to fetch

@ -15,6 +15,7 @@ import {
} from "./LinkTraversalData"; } from "./LinkTraversalData";
import { SolidProfileShapeShapeType } from "./.ldo/solidProfile.shapeTypes"; import { SolidProfileShapeShapeType } from "./.ldo/solidProfile.shapeTypes";
import { wait } from "./util/wait"; import { wait } from "./util/wait";
import { inspect } from "util";
describe("Link Traversal", () => { describe("Link Traversal", () => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment // eslint-disable-next-line @typescript-eslint/ban-ts-comment
@ -107,15 +108,16 @@ describe("Link Traversal", () => {
it.only("handles subscriptions if data changes on the Pod", async () => { it.only("handles subscriptions if data changes on the Pod", async () => {
const mainProfileResource = solidLdoDataset.getResource(MAIN_PROFILE_URI); const mainProfileResource = solidLdoDataset.getResource(MAIN_PROFILE_URI);
await solidLdoDataset const linkQuery = solidLdoDataset
.usingType(SolidProfileShapeShapeType) .usingType(SolidProfileShapeShapeType)
.startLinkQuery(mainProfileResource, MAIN_PROFILE_SUBJECT, { .startLinkQuery(mainProfileResource, MAIN_PROFILE_SUBJECT, {
name: true, name: true,
knows: { knows: {
name: true, name: true,
}, },
}) });
.subscribe();
const unsubscribeId = await linkQuery.subscribe();
// Should have regular information // Should have regular information
let mainProfile = solidLdoDataset let mainProfile = solidLdoDataset
@ -131,6 +133,13 @@ describe("Link Traversal", () => {
expect(mainProfile.knows?.size).toBe(1); expect(mainProfile.knows?.size).toBe(1);
expect(mainProfile.knows?.toArray()[0].name).toBe("Other User"); expect(mainProfile.knows?.toArray()[0].name).toBe("Other User");
let subscribedResources = linkQuery
.getSubscribedResources()
.map((resource) => resource.uri);
expect(subscribedResources.length).toBe(2);
expect(subscribedResources).toContain(MAIN_PROFILE_URI);
expect(subscribedResources).toContain(OTHER_PROFILE_URI);
console.log("=================="); console.log("==================");
// Update data on the Pod // Update data on the Pod
@ -159,5 +168,38 @@ describe("Link Traversal", () => {
const knowNames = mainProfile.knows?.map((knowsPerson) => knowsPerson.name); const knowNames = mainProfile.knows?.map((knowsPerson) => knowsPerson.name);
expect(knowNames).toContain("Other User"); expect(knowNames).toContain("Other User");
expect(knowNames).toContain("Third User"); expect(knowNames).toContain("Third User");
subscribedResources = linkQuery
.getSubscribedResources()
.map((resource) => resource.uri);
console.log("Subscribed Resources", subscribedResources);
expect(subscribedResources.length).toBe(3);
expect(subscribedResources).toContain(MAIN_PROFILE_URI);
expect(subscribedResources).toContain(OTHER_PROFILE_URI);
expect(subscribedResources).toContain(THIRD_PROFILE_URI);
// Unsubscribe
await linkQuery.unsubscribe(unsubscribeId);
await wait(200);
s.fetchMock.mockClear();
// Does not update when unsubscribed
await s.authFetch(MAIN_PROFILE_URI, {
method: "PATCH",
body: "INSERT DATA { <http://localhost:3005/test-container/mainProfile.ttl#me> <http://xmlns.com/foaf/0.1/knows> <http://localhost:3005/test-container/fourthProfile.ttl#me> . }",
headers: {
"Content-Type": "application/sparql-update",
},
});
await wait(1000);
expect(s.fetchMock).not.toHaveBeenCalled();
subscribedResources = linkQuery
.getSubscribedResources()
.map((resource) => resource.uri);
console.log("Subscribed Resources", subscribedResources);
expect(subscribedResources.length).toBe(0);
}); });
}); });

Loading…
Cancel
Save