|
|
|
@ -16,10 +16,13 @@ import type { NextGraphConnectedPlugin } from "../NextGraphConnectedPlugin"; |
|
|
|
|
import ng from "nextgraph"; |
|
|
|
|
import { changesToSparqlUpdate, type DatasetChanges } from "@ldo/rdf-utils"; |
|
|
|
|
import type { NextGraphNotificationMessage } from "../notifications/NextGraphNotificationMessage"; |
|
|
|
|
import type { Quad } from "@rdfjs/types"; |
|
|
|
|
import type { Dataset, Quad } from "@rdfjs/types"; |
|
|
|
|
import { namedNode, quad as createQuad } from "@rdfjs/data-model"; |
|
|
|
|
import { NextGraphReadSuccess } from "../results/NextGraphReadSuccess"; |
|
|
|
|
import { NextGraphNotificationSubscription } from "../notifications/NextGraphNotificationSubscription"; |
|
|
|
|
import { parseRdf } from "@ldo/ldo"; |
|
|
|
|
import type { LdoDataset } from "packages/ldo/dist/LdoDataset"; |
|
|
|
|
import { createDataset } from "@ldo/dataset"; |
|
|
|
|
|
|
|
|
|
export class NextGraphResource |
|
|
|
|
extends (EventEmitter as new () => ResourceEventEmitter) |
|
|
|
@ -97,6 +100,22 @@ export class NextGraphResource |
|
|
|
|
return error; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private overwriteQuads(quads: Quad[] | Dataset<Quad>) { |
|
|
|
|
const dataset = this.context.dataset; |
|
|
|
|
const graphNode = namedNode(this.uri); |
|
|
|
|
dataset.deleteMatches(undefined, undefined, undefined, graphNode); |
|
|
|
|
dataset.addAll( |
|
|
|
|
quads.map((ngQuad) => { |
|
|
|
|
return createQuad( |
|
|
|
|
ngQuad.subject, |
|
|
|
|
ngQuad.predicate, |
|
|
|
|
ngQuad.object, |
|
|
|
|
graphNode, |
|
|
|
|
); |
|
|
|
|
}), |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async read(): Promise< |
|
|
|
|
NextGraphReadSuccess | UnexpectedResourceError<NextGraphResource> |
|
|
|
|
> { |
|
|
|
@ -112,19 +131,7 @@ export class NextGraphResource |
|
|
|
|
this.uri, |
|
|
|
|
); |
|
|
|
|
// Update the dataset
|
|
|
|
|
const graphNode = namedNode(this.uri); |
|
|
|
|
const dataset = this.context.dataset; |
|
|
|
|
dataset.deleteMatches(undefined, undefined, undefined, graphNode); |
|
|
|
|
dataset.addAll( |
|
|
|
|
sparqlResult.map((ngQuad) => { |
|
|
|
|
return createQuad( |
|
|
|
|
ngQuad.subject, |
|
|
|
|
ngQuad.predicate, |
|
|
|
|
ngQuad.object, |
|
|
|
|
graphNode, |
|
|
|
|
); |
|
|
|
|
}), |
|
|
|
|
); |
|
|
|
|
this.overwriteQuads(sparqlResult); |
|
|
|
|
|
|
|
|
|
// Update statuses
|
|
|
|
|
const result = new NextGraphReadSuccess(this, false); |
|
|
|
@ -187,35 +194,50 @@ export class NextGraphResource |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private async notificationToQuads( |
|
|
|
|
notificationString: string, |
|
|
|
|
): Promise<Dataset<Quad>> { |
|
|
|
|
const rawTriples = JSON.parse(notificationString); |
|
|
|
|
const triples = ( |
|
|
|
|
await Promise.all<LdoDataset>( |
|
|
|
|
rawTriples.map(async (rawTriple) => |
|
|
|
|
parseRdf(`${rawTriple}.`, { baseIRI: this.uri }), |
|
|
|
|
), |
|
|
|
|
) |
|
|
|
|
).reduce((agg, ldoDataset) => { |
|
|
|
|
ldoDataset.forEach((quad) => { |
|
|
|
|
agg.add(quad); |
|
|
|
|
}); |
|
|
|
|
return agg; |
|
|
|
|
}, createDataset()); |
|
|
|
|
return triples; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
protected async onNotification(response: NextGraphNotificationMessage) { |
|
|
|
|
if (response.V0.State?.graph) { |
|
|
|
|
const json_str = new TextDecoder().decode( |
|
|
|
|
response.V0.State.graph.triples, |
|
|
|
|
); |
|
|
|
|
const triples = JSON.parse(json_str); |
|
|
|
|
|
|
|
|
|
for (const triple of triples) { |
|
|
|
|
// deal with each triple
|
|
|
|
|
console.log("STATE", triple); |
|
|
|
|
} |
|
|
|
|
const triples = await this.notificationToQuads(json_str); |
|
|
|
|
this.overwriteQuads(triples); |
|
|
|
|
} else if (response.V0.Patch?.graph) { |
|
|
|
|
const inserts_json_str = new TextDecoder().decode( |
|
|
|
|
const insertsString = new TextDecoder().decode( |
|
|
|
|
response.V0.Patch.graph.inserts, |
|
|
|
|
); |
|
|
|
|
const inserts = JSON.parse(inserts_json_str); |
|
|
|
|
const removes_json_str = new TextDecoder().decode( |
|
|
|
|
const removesString = new TextDecoder().decode( |
|
|
|
|
response.V0.Patch.graph.removes, |
|
|
|
|
); |
|
|
|
|
const removes = JSON.parse(removes_json_str); |
|
|
|
|
|
|
|
|
|
for (const insert of inserts) { |
|
|
|
|
// deal with each insert
|
|
|
|
|
console.log("INSERT", insert); |
|
|
|
|
} |
|
|
|
|
for (const remove of removes) { |
|
|
|
|
// deal with each remove
|
|
|
|
|
console.log("REMOVE", remove); |
|
|
|
|
} |
|
|
|
|
const [added, removed] = await Promise.all( |
|
|
|
|
[insertsString, removesString].map(async (str) => { |
|
|
|
|
return this.notificationToQuads(str); |
|
|
|
|
}), |
|
|
|
|
); |
|
|
|
|
|
|
|
|
|
this.context.dataset.bulk({ |
|
|
|
|
added, |
|
|
|
|
removed, |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|