From 9559bb2b398feb83e4c892e9fdf02435b9093872 Mon Sep 17 00:00:00 2001 From: Jackson Morgan Date: Thu, 3 Apr 2025 14:13:42 -0400 Subject: [PATCH] Completed preliminary nextgraph tests --- .../src/resources/NextGraphResource.ts | 86 ++++++++++++------- .../test/integration.test.ts | 21 ++++- 2 files changed, 73 insertions(+), 34 deletions(-) diff --git a/packages/connected-nextgraph/src/resources/NextGraphResource.ts b/packages/connected-nextgraph/src/resources/NextGraphResource.ts index 85aeb86..01d6ae2 100644 --- a/packages/connected-nextgraph/src/resources/NextGraphResource.ts +++ b/packages/connected-nextgraph/src/resources/NextGraphResource.ts @@ -16,10 +16,13 @@ import type { NextGraphConnectedPlugin } from "../NextGraphConnectedPlugin"; import ng from "nextgraph"; import { changesToSparqlUpdate, type DatasetChanges } from "@ldo/rdf-utils"; import type { NextGraphNotificationMessage } from "../notifications/NextGraphNotificationMessage"; -import type { Quad } from "@rdfjs/types"; +import type { Dataset, Quad } from "@rdfjs/types"; import { namedNode, quad as createQuad } from "@rdfjs/data-model"; import { NextGraphReadSuccess } from "../results/NextGraphReadSuccess"; import { NextGraphNotificationSubscription } from "../notifications/NextGraphNotificationSubscription"; +import { parseRdf } from "@ldo/ldo"; +import type { LdoDataset } from "packages/ldo/dist/LdoDataset"; +import { createDataset } from "@ldo/dataset"; export class NextGraphResource extends (EventEmitter as new () => ResourceEventEmitter) @@ -97,6 +100,22 @@ export class NextGraphResource return error; } + private overwriteQuads(quads: Quad[] | Dataset) { + const dataset = this.context.dataset; + const graphNode = namedNode(this.uri); + dataset.deleteMatches(undefined, undefined, undefined, graphNode); + dataset.addAll( + quads.map((ngQuad) => { + return createQuad( + ngQuad.subject, + ngQuad.predicate, + ngQuad.object, + graphNode, + ); + }), + ); + } + async read(): Promise< NextGraphReadSuccess | UnexpectedResourceError > { @@ -112,19 +131,7 @@ export class NextGraphResource this.uri, ); // Update the dataset - const graphNode = namedNode(this.uri); - const dataset = this.context.dataset; - dataset.deleteMatches(undefined, undefined, undefined, graphNode); - dataset.addAll( - sparqlResult.map((ngQuad) => { - return createQuad( - ngQuad.subject, - ngQuad.predicate, - ngQuad.object, - graphNode, - ); - }), - ); + this.overwriteQuads(sparqlResult); // Update statuses const result = new NextGraphReadSuccess(this, false); @@ -187,35 +194,50 @@ export class NextGraphResource } } + private async notificationToQuads( + notificationString: string, + ): Promise> { + const rawTriples = JSON.parse(notificationString); + const triples = ( + await Promise.all( + rawTriples.map(async (rawTriple) => + parseRdf(`${rawTriple}.`, { baseIRI: this.uri }), + ), + ) + ).reduce((agg, ldoDataset) => { + ldoDataset.forEach((quad) => { + agg.add(quad); + }); + return agg; + }, createDataset()); + return triples; + } + protected async onNotification(response: NextGraphNotificationMessage) { if (response.V0.State?.graph) { const json_str = new TextDecoder().decode( response.V0.State.graph.triples, ); - const triples = JSON.parse(json_str); - - for (const triple of triples) { - // deal with each triple - console.log("STATE", triple); - } + const triples = await this.notificationToQuads(json_str); + this.overwriteQuads(triples); } else if (response.V0.Patch?.graph) { - const inserts_json_str = new TextDecoder().decode( + const insertsString = new TextDecoder().decode( response.V0.Patch.graph.inserts, ); - const inserts = JSON.parse(inserts_json_str); - const removes_json_str = new TextDecoder().decode( + const removesString = new TextDecoder().decode( response.V0.Patch.graph.removes, ); - const removes = JSON.parse(removes_json_str); - for (const insert of inserts) { - // deal with each insert - console.log("INSERT", insert); - } - for (const remove of removes) { - // deal with each remove - console.log("REMOVE", remove); - } + const [added, removed] = await Promise.all( + [insertsString, removesString].map(async (str) => { + return this.notificationToQuads(str); + }), + ); + + this.context.dataset.bulk({ + added, + removed, + }); } } diff --git a/packages/connected-nextgraph/test/integration.test.ts b/packages/connected-nextgraph/test/integration.test.ts index 5e1d901..567ec5b 100644 --- a/packages/connected-nextgraph/test/integration.test.ts +++ b/packages/connected-nextgraph/test/integration.test.ts @@ -9,7 +9,7 @@ import { createNextGraphLdoDataset } from "../src/createNextGraphLdoDataset"; import { parseRdf } from "@ldo/ldo"; import { namedNode } from "@rdfjs/data-model"; import type { NextGraphReadSuccess } from "../src/results/NextGraphReadSuccess"; -import { rm, mkdir, cp, readdir } from "fs/promises"; +import { rm, cp } from "fs/promises"; import path from "path"; const SAMPLE_TTL = `@base . @@ -79,7 +79,7 @@ describe("NextGraph Plugin", () => { }); }); - describe("readResource", () => { + describe("read and subscribe", () => { let populatedResourceUri: NextGraphUri; beforeEach(async () => { const resource = (await nextgraphLdoDataset.createResource( @@ -139,5 +139,22 @@ describe("NextGraph Plugin", () => { expect(result.type).toBe("nextGraphReadSuccess"); expect(result.recalledFromMemory).toBe(true); }); + + it("Subscribes to a resource", async () => { + const resource = nextgraphLdoDataset.getResource(populatedResourceUri); + await resource.subscribeToNotifications(); + // Wait for subscription + await new Promise((resolve) => setTimeout(resolve, 200)); + expect(nextgraphLdoDataset.size).toBe(7); + expect( + nextgraphLdoDataset.match( + namedNode("http://example.org/#spiderman"), + namedNode("http://www.perceive.net/schemas/relationship/enemyOf"), + namedNode("http://example.org/#green-goblin"), + namedNode(resource.uri), + ).size, + ).toBe(1); + await resource.unsubscribeFromAllNotifications(); + }); }); });