Subscriptions work locally

main
Jackson Morgan 4 months ago
parent 89d2f7c88a
commit 46be390649
  1. 41
      package-lock.json
  2. 3
      packages/connected/package.json
  3. 32
      packages/connected/src/linkTraversal/ResourceLinkQuery.ts
  4. 21
      packages/connected/src/linkTraversal/exploreLinks.ts
  5. 2
      packages/connected/test/LinkTraversalData.ts
  6. 60
      packages/connected/test/LinkTraversalIntegration.test.ts
  7. 3
      packages/connected/test/util/wait.ts
  8. 4
      packages/subscribable-dataset/package.json
  9. 3
      packages/subscribable-dataset/src/SubscribableDataset.ts
  10. 1
      packages/subscribable-dataset/src/types.ts
  11. 10
      packages/subscribable-dataset/test/SubscribableDataset.test.ts

41
package-lock.json generated

@ -25939,7 +25939,8 @@
"ts-node": "^10.9.1",
"typed-emitter": "^2.1.0",
"typedoc": "^0.25.4",
"typedoc-plugin-markdown": "^3.17.1"
"typedoc-plugin-markdown": "^3.17.1",
"uuid": "^11.1.0"
}
},
"packages/connected-nextgraph": {
@ -26003,6 +26004,20 @@
"typedoc-plugin-markdown": "^3.17.1"
}
},
"packages/connected/node_modules/uuid": {
"version": "11.1.0",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.0.tgz",
"integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==",
"dev": true,
"funding": [
"https://github.com/sponsors/broofa",
"https://github.com/sponsors/ctavan"
],
"license": "MIT",
"bin": {
"uuid": "dist/esm/bin/uuid"
}
},
"packages/dataset": {
"name": "@ldo/dataset",
"version": "1.0.0-alpha.3",
@ -26287,7 +26302,8 @@
"license": "MIT",
"dependencies": {
"@ldo/dataset": "^1.0.0-alpha.3",
"@ldo/rdf-utils": "^1.0.0-alpha.3"
"@ldo/rdf-utils": "^1.0.0-alpha.3",
"uuid": "^11.1.0"
},
"devDependencies": {
"@rdfjs/data-model": "^1.2.0",
@ -26295,9 +26311,17 @@
"@rdfjs/types": "^1.0.1",
"@types/jsonld": "^1.5.6",
"@types/rdfjs__dataset": "^1.0.4",
"@types/uuid": "^10.0.0",
"ts-node": "^9.1.1"
}
},
"packages/subscribable-dataset/node_modules/@types/uuid": {
"version": "10.0.0",
"resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-10.0.0.tgz",
"integrity": "sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==",
"dev": true,
"license": "MIT"
},
"packages/subscribable-dataset/node_modules/source-map-support": {
"version": "0.5.21",
"dev": true,
@ -26332,6 +26356,19 @@
"typescript": ">=2.7"
}
},
"packages/subscribable-dataset/node_modules/uuid": {
"version": "11.1.0",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.0.tgz",
"integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==",
"funding": [
"https://github.com/sponsors/broofa",
"https://github.com/sponsors/ctavan"
],
"license": "MIT",
"bin": {
"uuid": "dist/esm/bin/uuid"
}
},
"packages/test-solid-server": {
"name": "@ldo/test-solid-server",
"version": "1.0.0-alpha.8",

@ -33,7 +33,8 @@
"ts-node": "^10.9.1",
"typed-emitter": "^2.1.0",
"typedoc": "^0.25.4",
"typedoc-plugin-markdown": "^3.17.1"
"typedoc-plugin-markdown": "^3.17.1",
"uuid": "^11.1.0"
},
"dependencies": {
"@ldo/dataset": "^1.0.0-alpha.3",

@ -10,6 +10,9 @@ import type { SubjectNode } from "@ldo/rdf-utils";
import { exploreLinks } from "./exploreLinks";
import type { IConnectedLdoDataset } from "../types/IConnectedLdoDataset";
import type { IConnectedLdoBuilder } from "../types/IConnectedLdoBuilder";
import { v4 } from "uuid";
import type { nodeEventListener } from "@ldo/subscribable-dataset";
import type { Quad } from "@rdfjs/types";
export class ResourceLinkQuery<
Type extends LdoBase,
@ -22,6 +25,7 @@ export class ResourceLinkQuery<
// uri -> unsubscribeId
protected resourceUnsubscribeIds: Record<string, string> = {};
protected thisUnsubscribeIds: Set<string> = new Set();
protected previousTransactionId: string = "INIT";
constructor(
protected parentDataset: IConnectedLdoDataset<Plugins>,
@ -47,15 +51,39 @@ export class ResourceLinkQuery<
}
async subscribe(): Promise<string> {
const subscriptionId = v4();
const onDataChanged: nodeEventListener<Quad> = async (
_changes,
transactionId: string,
_triggering,
) => {
console.log(
`Transaction ID: ${transactionId}\ntriggering: [${_triggering[0]
?.value}, ${_triggering[1]?.value}, ${_triggering[2]
?.value}, ${_triggering[3]
?.value}]\nadded: ${_changes.added?.toString()}\nremoved:${_changes.removed?.toString()}`,
);
// Set a transaction Id, so that we only trigger one re-render
if (transactionId === this.previousTransactionId) return;
this.previousTransactionId = transactionId;
// Remove previous registration
this.parentDataset.removeListenerFromAllEvents(onDataChanged);
// Explore the links, with a subscription to re-explore the links if any
// covered information changes
await exploreLinks(
this.parentDataset,
this.shapeType,
this.startingResource,
this.startingSubject,
this.linkQueryInput,
{},
{
onCoveredDataChanged: onDataChanged,
},
);
return "string";
};
await onDataChanged({}, "BEGIN_SUB", [null, null, null, null]);
return subscriptionId;
}
private async fullUnsubscribe(): Promise<void> {

@ -5,11 +5,14 @@ import type { LQInput } from "../types/ILinkQuery";
import { BasicLdSet } from "@ldo/jsonld-dataset-proxy";
import type { IConnectedLdoDataset } from "../types/IConnectedLdoDataset";
import { createTrackingProxyBuilder } from "../trackingProxy/createTrackingProxy";
import type { nodeEventListener } from "@ldo/subscribable-dataset";
import type { Quad } from "@rdfjs/types";
interface ExploreLinksOptions<Plugins extends ConnectedPlugin[]> {
onResourceEncountered?: (
resource: Plugins[number]["types"]["resource"],
) => void;
onCoveredDataChanged?: nodeEventListener<Quad>;
shouldRefreshResources?: boolean;
}
@ -30,15 +33,17 @@ export async function exploreLinks<
: await startingResource.readIfUnfetched();
if (readResult.isError) return;
const trackingProxyBuilder = createTrackingProxyBuilder(
if (options?.onResourceEncountered)
options?.onResourceEncountered(startingResource);
const proxyBuilder = options?.onCoveredDataChanged
? createTrackingProxyBuilder(
dataset,
shapeType,
(changes) =>
console.log(
`Got Update \nadded: ${changes.added?.toString()}\nremoved: ${changes.removed?.toString()}`,
),
);
const ldObject = trackingProxyBuilder.fromSubject(startingSubject);
options?.onCoveredDataChanged,
)
: dataset.usingType(shapeType);
const ldObject = proxyBuilder.fromSubject(startingSubject);
const fetchedDuringThisExploration = new Set<string>([startingResource.uri]);
@ -77,6 +82,8 @@ export async function exploreLinksRecursive<
if (readResult.isError) {
return;
}
if (options?.onResourceEncountered)
options.onResourceEncountered(resourceToFetch);
fetchedDuringThisExploration.add(resourceToFetch.uri);
}
// Recurse through the other elemenets

@ -5,7 +5,7 @@ export const MAIN_PROFILE_URI = `${BASE_CONTAINER}mainProfile.ttl`;
export const MAIN_PROFILE_SUBJECT = `${MAIN_PROFILE_URI}#me`;
export const OTHER_PROFILE_URI = `${BASE_CONTAINER}otherProfile.ttl`;
export const OTHER_PROFILE_SUBJECT = `${OTHER_PROFILE_URI}#me`;
export const THIRD_PROFILE_URI = `${BASE_CONTAINER}otherProfile.ttl`;
export const THIRD_PROFILE_URI = `${BASE_CONTAINER}thirdProfile.ttl`;
export const THIRD_PROFILE_SUBJECT = `${THIRD_PROFILE_URI}#me`;
export const linkTraversalData: ResourceInfo = {

@ -14,6 +14,7 @@ import {
THIRD_PROFILE_URI,
} from "./LinkTraversalData";
import { SolidProfileShapeShapeType } from "./.ldo/solidProfile.shapeTypes";
import { wait } from "./util/wait";
describe("Link Traversal", () => {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
@ -50,7 +51,61 @@ describe("Link Traversal", () => {
expect(data.knows?.toArray()[0].name).toBe("Other User");
});
it("handles subscriptions if data changes", async () => {
it("handles subscriptions if data changes locally", async () => {
const mainProfileResource = solidLdoDataset.getResource(MAIN_PROFILE_URI);
await solidLdoDataset
.usingType(SolidProfileShapeShapeType)
.startLinkQuery(mainProfileResource, MAIN_PROFILE_SUBJECT, {
name: true,
knows: {
name: true,
},
})
.subscribe();
// Should have regular information
let mainProfile = solidLdoDataset
.usingType(SolidProfileShapeShapeType)
.fromSubject(MAIN_PROFILE_SUBJECT);
let resourceUris = solidLdoDataset
.getResources()
.map((resource) => resource.uri);
expect(resourceUris.length).toBe(3);
expect(resourceUris).toContain(MAIN_PROFILE_URI);
expect(resourceUris).toContain(OTHER_PROFILE_URI);
expect(mainProfile.name).toBe("Main User");
expect(mainProfile.knows?.size).toBe(1);
expect(mainProfile.knows?.toArray()[0].name).toBe("Other User");
// Update to include a new document
const cMainProfile = changeData(mainProfile, mainProfileResource);
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
cMainProfile.knows?.add({ "@id": THIRD_PROFILE_SUBJECT });
await commitData(cMainProfile);
// Wait for 200ms to allow the other file to be fetched
await wait(200);
// After the data is committed, the third profile should be present
mainProfile = solidLdoDataset
.usingType(SolidProfileShapeShapeType)
.fromSubject(MAIN_PROFILE_SUBJECT);
resourceUris = solidLdoDataset
.getResources()
.map((resource) => resource.uri);
expect(resourceUris.length).toBe(4);
expect(resourceUris).toContain(MAIN_PROFILE_URI);
expect(resourceUris).toContain(OTHER_PROFILE_URI);
expect(resourceUris).toContain(THIRD_PROFILE_URI);
expect(mainProfile.name).toBe("Main User");
expect(mainProfile.knows?.size).toBe(2);
const knowNames = mainProfile.knows?.map((knowsPerson) => knowsPerson.name);
expect(knowNames).toContain("Other User");
expect(knowNames).toContain("Third User");
});
it("handles subscriptions if data changes on the Pod", async () => {
const mainProfileResource = solidLdoDataset.getResource(MAIN_PROFILE_URI);
await solidLdoDataset
.usingType(SolidProfileShapeShapeType)
@ -85,6 +140,9 @@ describe("Link Traversal", () => {
cMainProfile.knows?.add({ "@id": THIRD_PROFILE_SUBJECT });
await commitData(cMainProfile);
// Wait for 200ms to allow the other file to be fetched
await wait(200);
// After the data is committed, the third profile should be present
mainProfile = solidLdoDataset
.usingType(SolidProfileShapeShapeType)

@ -0,0 +1,3 @@
export async function wait(time: number) {
return new Promise((resolve) => setTimeout(resolve, time));
}

@ -27,11 +27,13 @@
"@rdfjs/types": "^1.0.1",
"@types/jsonld": "^1.5.6",
"@types/rdfjs__dataset": "^1.0.4",
"@types/uuid": "^10.0.0",
"ts-node": "^9.1.1"
},
"dependencies": {
"@ldo/dataset": "^1.0.0-alpha.3",
"@ldo/rdf-utils": "^1.0.0-alpha.3"
"@ldo/rdf-utils": "^1.0.0-alpha.3",
"uuid": "^11.1.0"
},
"files": [
"dist",

@ -16,6 +16,7 @@ import type {
ITransactionDatasetFactory,
} from "./types";
import { ExtendedDataset } from "@ldo/dataset";
import { v4 } from "uuid";
/**
* A wrapper for a dataset that allows subscriptions to be made on nodes to
@ -239,12 +240,14 @@ export class SubscribableDataset<InAndOutQuad extends BaseQuad = BaseQuad>
populateMatchingDatasetChanges("added");
populateMatchingDatasetChanges("removed");
const transactionId = v4();
// Alert all listeners
Object.entries(matchingDatasetChanges).forEach(
([quadMatchString, info]) => {
this.eventEmitter.emit(
quadMatchString,
info.changes,
transactionId,
info.triggerQuadMatch,
);
},

@ -6,6 +6,7 @@ import type { Dataset, BaseQuad, DatasetFactory } from "@rdfjs/types";
*/
export type nodeEventListener<InAndOutQuad extends BaseQuad = BaseQuad> = (
changes: DatasetChanges<InAndOutQuad>,
transactionId: string,
triggeringQuadMatch: QuadMatch,
) => void;

@ -62,7 +62,7 @@ describe("SubscribableDataset", () => {
expect(callbackFunc).toBeCalledTimes(1);
expect(callbackFunc.mock.calls[0][0].added.size).toBe(1);
expect(callbackFunc.mock.calls[0][0].added.has(tomColorQuad)).toBe(true);
expect(callbackFunc.mock.calls[0][1]).toEqual([
expect(callbackFunc.mock.calls[0][2]).toEqual([
namedNode("http://example.org/cartoons#Tom"),
null,
null,
@ -80,7 +80,7 @@ describe("SubscribableDataset", () => {
expect(callbackFunc).toBeCalledTimes(1);
expect(callbackFunc.mock.calls[0][0].removed.size).toBe(1);
expect(callbackFunc.mock.calls[0][0].removed.has(tomTypeQuad)).toBe(true);
expect(callbackFunc.mock.calls[0][1]).toEqual([
expect(callbackFunc.mock.calls[0][2]).toEqual([
namedNode("http://example.org/cartoons#Tom"),
null,
null,
@ -99,7 +99,7 @@ describe("SubscribableDataset", () => {
expect(callbackFunc.mock.calls[0][0].added.size).toBe(2);
expect(callbackFunc.mock.calls[0][0].added.has(lickyNameQuad)).toBe(true);
expect(callbackFunc.mock.calls[0][0].added.has(lickyTypeQuad)).toBe(true);
expect(callbackFunc.mock.calls[0][1]).toEqual([
expect(callbackFunc.mock.calls[0][2]).toEqual([
namedNode("http://example.org/cartoons#Licky"),
null,
null,
@ -123,7 +123,7 @@ describe("SubscribableDataset", () => {
),
).toBe(true);
expect(callbackFuncLicky.mock.calls[0][0].removed).toBe(undefined);
expect(callbackFuncLicky.mock.calls[0][1]).toEqual([
expect(callbackFuncLicky.mock.calls[0][2]).toEqual([
namedNode("http://example.org/cartoons#Licky"),
null,
null,
@ -147,7 +147,7 @@ describe("SubscribableDataset", () => {
),
).toBe(true);
expect(callbackFuncTom.mock.calls[0][0].added).toBe(undefined);
expect(callbackFuncTom.mock.calls[0][1]).toEqual([
expect(callbackFuncTom.mock.calls[0][2]).toEqual([
namedNode("http://example.org/cartoons#Tom"),
null,
null,

Loading…
Cancel
Save