diff --git a/packages/solid-type-index/src/react/useProfile.ts b/packages/solid-type-index/src/react/useProfile.ts deleted file mode 100644 index e69de29..0000000 diff --git a/packages/solid-type-index/src/react/useType.ts b/packages/solid-type-index/src/react/useType.ts deleted file mode 100644 index e69de29..0000000 diff --git a/packages/solid-type-index/src/react/useTypeIndex.ts b/packages/solid-type-index/src/react/useTypeIndex.ts new file mode 100644 index 0000000..122c069 --- /dev/null +++ b/packages/solid-type-index/src/react/useTypeIndex.ts @@ -0,0 +1,31 @@ +import type { LeafUri } from "@ldo/solid"; +import { useTypeIndexProfile } from "./useTypeIndexProfile"; +import { useEffect, useMemo } from "react"; +import { useLdo } from "@ldo/solid-react"; + +export function useTypeIndex(classUri: string): Promise { + const { dataset } = useLdo(); + + const profile = useTypeIndexProfile(); + const typeIndexUris: string[] = useMemo(() => { + const uris: string[] = []; + profile?.privateTypeIndex?.forEach((indexNode) => { + uris.push(indexNode["@id"]); + }); + profile?.publicTypeIndex?.forEach((indexNode) => { + uris.push(indexNode["@id"]); + }); + }, [profile]); + + useEffect(() => { + const resources = typeIndexUris.map((uri) => dataset.getResource(uri)); + resources.forEach((resource) => { + resource.readIfUnfetched(); + resource.subscribeToNotifications(); + }); + + return () => { + resources.forEach((resource) => resource.unsubscribeFromNotifications()); + } + }, [typeIndexUris]); +} diff --git a/packages/solid-type-index/src/react/useTypeIndexProfile.ts b/packages/solid-type-index/src/react/useTypeIndexProfile.ts new file mode 100644 index 0000000..aa7e0f1 --- /dev/null +++ b/packages/solid-type-index/src/react/useTypeIndexProfile.ts @@ -0,0 +1,10 @@ +import { useResource, useSolidAuth, useSubject } from "@ldo/solid-react"; +import type { TypeIndexProfile } from "../.ldo/profile.typings"; +import { TypeIndexProfileShapeType } from "../.ldo/profile.shapeTypes"; + +export function useTypeIndexProfile(): TypeIndexProfile | undefined { + const { session } = useSolidAuth(); + useResource(session.webId, { subscribe: true }); + const profile = useSubject(TypeIndexProfileShapeType, session.webId); + return profile; +} diff --git a/packages/solid/src/resource/Resource.ts b/packages/solid/src/resource/Resource.ts index 5810428..c21dc89 100644 --- a/packages/solid/src/resource/Resource.ts +++ b/packages/solid/src/resource/Resource.ts @@ -37,9 +37,9 @@ import { setWacRuleForAclUri, type SetWacRuleResult } from "./wac/setWacRule"; import type { LeafUri } from "../util/uriTypes"; import type { NoRootContainerError } from "../requester/results/error/NoRootContainerError"; import type { - CloseSubscriptionResult, NotificationSubscription, - OpenSubscriptionResult, + SubscribeResult, + UnsubscribeResult, } from "./notifications/NotificationSubscription"; import { Websocket2023NotificationSubscription } from "./notifications/Websocket2023NotificationSubscription"; import type { NotificationMessage } from "./notifications/NotificationMessage"; @@ -111,7 +111,7 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{ * @internal * Handles notification subscriptions */ - protected notificationSubscription?: NotificationSubscription; + protected notificationSubscription: NotificationSubscription; /** * @param context - SolidLdoDatasetContext for the parent dataset @@ -119,6 +119,11 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{ constructor(context: SolidLdoDatasetContext) { super(); this.context = context; + this.notificationSubscription = new Websocket2023NotificationSubscription( + this, + this.onNotification.bind(this), + this.context, + ); } /** @@ -334,7 +339,7 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{ * ``` */ isSubscribedToNotifications(): boolean { - return !!this.notificationSubscription; + return this.notificationSubscription.isSubscribedToNotifications(); } /** @@ -735,7 +740,7 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{ * * @param onNotificationError - A callback function if there is an error * with notifications. - * @returns OpenSubscriptionResult + * @returns SubscriptionResult * * @example * ```typescript @@ -754,19 +759,22 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{ * ); * * // Subscribe - * const subscriptionResult = await testContainer.subscribeToNotifications(); + * const subscriptionResult = await testContainer.subscribeToNotifications( + * // These are optional callbacks. A subscription will automatically keep + * // the dataset in sync. Use these callbacks for additional functionality. + * onNotification: (message) => console.log(message), + * onNotificationError: (err) => console.log(err.message) + * ); * // ... From there you can ait for a file to be changed on the Pod. */ async subscribeToNotifications( + onNotification?: (message: NotificationMessage) => void, onNotificationError?: (err: Error) => void, - ): Promise { - this.notificationSubscription = new Websocket2023NotificationSubscription( - this, - this.onNotification.bind(this), + ): Promise { + return await this.notificationSubscription.subscribeToNotifications({ + onNotification, onNotificationError, - this.context, - ); - return await this.notificationSubscription.open(); + }); } /** @@ -802,22 +810,38 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{ /** * Unsubscribes from changes made to this resource on the Pod * - * @returns CloseSubscriptionResult + * @returns UnsubscribeResult * * @example * ```typescript - * resource.unsubscribeFromNotifications() + * const subscriptionResult = await testContainer.subscribeToNotifications(); + * await testContainer.unsubscribeFromNotifications( + * subscriptionResult.subscriptionId + * ); * ``` */ - async unsubscribeFromNotifications(): Promise { - const result = await this.notificationSubscription?.close(); - this.notificationSubscription = undefined; - return ( - result ?? { - type: "unsubscribeFromNotificationSuccess", - isError: false, - uri: this.uri, - } + async unsubscribeFromNotifications( + subscriptionId: string, + ): Promise { + return this.notificationSubscription.unsubscribeFromNotification( + subscriptionId, ); } + + /** + * Unsubscribes from all notifications on this resource + * + * @returns UnsubscribeResult[] + * + * @example + * ```typescript + * const subscriptionResult = await testContainer.subscribeToNotifications(); + * await testContainer.unsubscribeFromNotifications( + * subscriptionResult.subscriptionId + * ); + * ``` + */ + async unsubscribeFromAllNotifications(): Promise { + return this.notificationSubscription.unsubscribeFromAllNotifications(); + } } diff --git a/packages/solid/src/resource/notifications/NotificationSubscription.ts b/packages/solid/src/resource/notifications/NotificationSubscription.ts index 87ede6d..1c9113e 100644 --- a/packages/solid/src/resource/notifications/NotificationSubscription.ts +++ b/packages/solid/src/resource/notifications/NotificationSubscription.ts @@ -2,50 +2,129 @@ import type { UnexpectedResourceError } from "../../requester/results/error/Erro import type { SolidLdoDatasetContext } from "../../SolidLdoDatasetContext"; import type { Resource } from "../Resource"; import type { NotificationMessage } from "./NotificationMessage"; -import type { UnsupportedNotificationError } from "./results/NotificationErrors"; +import type { + NotificationCallbackError, + UnsupportedNotificationError, +} from "./results/NotificationErrors"; import type { SubscribeToNotificationSuccess } from "./results/SubscribeToNotificationSuccess"; import type { UnsubscribeToNotificationSuccess } from "./results/UnsubscribeFromNotificationSuccess"; +import { v4 } from "uuid"; -export type OpenSubscriptionResult = +export type SubscribeResult = | SubscribeToNotificationSuccess | UnsupportedNotificationError | UnexpectedResourceError; -export type CloseSubscriptionResult = +export type UnsubscribeResult = | UnsubscribeToNotificationSuccess | UnexpectedResourceError; +export type OpenResult = + | { type: "success" } + | UnsupportedNotificationError + | UnexpectedResourceError; + +export type CloseResult = { type: "success" } | UnexpectedResourceError; + +export interface SubscriptionCallbacks { + onNotification?: (message: NotificationMessage) => void; + // TODO: make notification errors more specific + onNotificationError?: (error: Error) => void; +} + /** * @internal * Abstract class for notification subscription methods. */ export abstract class NotificationSubscription { protected resource: Resource; - protected onNotification: (message: NotificationMessage) => void; - protected onError?: (err: Error) => void; + protected parentSubscription: (message: NotificationMessage) => void; protected context: SolidLdoDatasetContext; + protected subscriptions: Record = {}; + protected isOpen: boolean = false; constructor( resource: Resource, - onNotification: (message: NotificationMessage) => void, - onError: ((err: Error) => void) | undefined, + parentSubscription: (message: NotificationMessage) => void, context: SolidLdoDatasetContext, ) { this.resource = resource; - this.onNotification = onNotification; - this.onError = onError; + this.parentSubscription = parentSubscription; this.context = context; } + public isSubscribedToNotifications(): boolean { + return this.isOpen; + } + + protected onNotification(message: NotificationMessage): void { + this.parentSubscription(message); + Object.values(this.subscriptions).forEach(({ onNotification }) => { + onNotification?.(message); + }); + } + + protected onNotificationError(message: NotificationCallbackError): void { + Object.values(this.subscriptions).forEach(({ onNotificationError }) => { + onNotificationError?.(message); + }); + if (message.type === "disconnectedNotAttemptingReconnectError") { + this.isOpen = false; + } + } + + async subscribeToNotifications( + subscriptionCallbacks: SubscriptionCallbacks, + ): Promise { + if (!this.isOpen) { + const openResult = await this.open(); + if (openResult.type !== "success") return openResult; + this.isOpen = true; + } + const subscriptionId = v4(); + this.subscriptions[subscriptionId] = subscriptionCallbacks; + return { + isError: false, + type: "subscribeToNotificationSuccess", + uri: this.resource.uri, + subscriptionId, + }; + } + + async unsubscribeFromNotification( + subscriptionId: string, + ): Promise { + delete this.subscriptions[subscriptionId]; + if (Object.keys(this.subscriptions).length === 0) { + const closeResult = await this.close(); + if (closeResult.type !== "success") return closeResult; + this.isOpen = false; + } + return { + isError: false, + type: "unsubscribeFromNotificationSuccess", + uri: this.resource.uri, + subscriptionId, + }; + } + + async unsubscribeFromAllNotifications(): Promise { + return Promise.all( + Object.keys(this.subscriptions).map((id) => + this.unsubscribeFromNotification(id), + ), + ); + } + /** * @internal * Opens the subscription */ - abstract open(): Promise; + protected abstract open(): Promise; /** * @internal * Closes the subscription */ - abstract close(): Promise; + protected abstract close(): Promise; } diff --git a/packages/solid/src/resource/notifications/Websocket2023NotificationSubscription.ts b/packages/solid/src/resource/notifications/Websocket2023NotificationSubscription.ts index 5434b7b..420bd6e 100644 --- a/packages/solid/src/resource/notifications/Websocket2023NotificationSubscription.ts +++ b/packages/solid/src/resource/notifications/Websocket2023NotificationSubscription.ts @@ -1,12 +1,13 @@ import { UnexpectedResourceError } from "../../requester/results/error/ErrorResult"; -import type { - CloseSubscriptionResult, - OpenSubscriptionResult, -} from "./NotificationSubscription"; +import type { CloseResult, OpenResult } from "./NotificationSubscription"; import { NotificationSubscription } from "./NotificationSubscription"; import { SubscriptionClient } from "@solid-notifications/subscription"; import { WebSocket } from "ws"; -import { UnsupportedNotificationError } from "./results/NotificationErrors"; +import { + DisconnectedAttemptingReconnectError, + DisconnectedNotAttemptingReconnectError, + UnsupportedNotificationError, +} from "./results/NotificationErrors"; import type { NotificationMessage } from "./NotificationMessage"; import type { Resource } from "../Resource"; import type { SolidLdoDatasetContext } from "../../SolidLdoDatasetContext"; @@ -22,18 +23,27 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript private socket: WebSocket | undefined; private createWebsocket: (address: string) => WebSocket; + // Reconnection data + // How often we should attempt a reconnection + private reconnectInterval = 5000; + // How many attempts have already been tried for a reconnection + private reconnectAttempts = 0; + // Whether or not the socket was manually closes + private isManualClose = false; + // Maximum number of attempts to reconnect + private maxReconnectAttempts = 10; + constructor( resource: Resource, - onNotification: (message: NotificationMessage) => void, - onError: ((err: Error) => void) | undefined, + parentSubscription: (message: NotificationMessage) => void, context: SolidLdoDatasetContext, createWebsocket?: (address: string) => WebSocket, ) { - super(resource, onNotification, onError, context); + super(resource, parentSubscription, context); this.createWebsocket = createWebsocket ?? createWebsocketDefault; } - async open(): Promise { + async open(): Promise { try { const notificationChannel = await this.discoverNotificationChannel(); return this.subscribeToWebsocket(notificationChannel); @@ -48,7 +58,7 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript } } - async discoverNotificationChannel(): Promise { + public async discoverNotificationChannel(): Promise { const client = new SubscriptionClient(this.context.fetch); return await client.subscribe( this.resource.uri, @@ -56,43 +66,75 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript ); } - async subscribeToWebsocket( + public async subscribeToWebsocket( notificationChannel: NotificationChannel, - ): Promise { - return new Promise((resolve) => { + ): Promise { + return new Promise((resolve) => { let didResolve = false; this.socket = this.createWebsocket( notificationChannel.receiveFrom as string, ); + + this.socket.onopen = () => { + this.reconnectAttempts = 0; // Reset attempts on successful connection + this.isManualClose = false; // Reset manual close flag + didResolve = true; + resolve({ + type: "success", + }); + }; + this.socket.onmessage = (message) => { const messageData = message.data.toString(); // TODO uncompliant Pod error on misformatted message this.onNotification(JSON.parse(messageData) as NotificationMessage); }; + + this.socket.onclose = this.onClose.bind(this); + this.socket.onerror = (err) => { if (!didResolve) { resolve(UnexpectedResourceError.fromThrown(this.resource.uri, err)); } else { - this.onError?.(err.error); + this.onNotificationError( + new UnexpectedResourceError(this.resource.uri, err.error), + ); } }; - this.socket.onopen = () => { - didResolve = true; - resolve({ - isError: false, - type: "subscribeToNotificationSuccess", - uri: this.resource.uri, - }); - }; }); } - async close(): Promise { + private onClose() { + if (!this.isManualClose) { + // Attempt to reconnect only if the disconnection was unintentional + if (this.reconnectAttempts < this.maxReconnectAttempts) { + this.reconnectAttempts++; + const backoffTime = Math.min( + this.reconnectInterval * this.reconnectAttempts, + 30000, + ); // Cap backoff at 30 seconds + setTimeout(this.open, backoffTime); + this.onNotificationError( + new DisconnectedAttemptingReconnectError( + this.resource.uri, + `Attempting to reconnect to Websocket for ${this.resource.uri}.`, + ), + ); + } else { + this.onNotificationError( + new DisconnectedNotAttemptingReconnectError( + this.resource.uri, + `Lost connection to websocket for ${this.resource.uri}.`, + ), + ); + } + } + } + + protected async close(): Promise { this.socket?.terminate(); return { - type: "unsubscribeFromNotificationSuccess", - isError: false, - uri: this.resource.uri, + type: "success", }; } } diff --git a/packages/solid/src/resource/notifications/results/NotificationErrors.ts b/packages/solid/src/resource/notifications/results/NotificationErrors.ts index 70382f7..dcde3bb 100644 --- a/packages/solid/src/resource/notifications/results/NotificationErrors.ts +++ b/packages/solid/src/resource/notifications/results/NotificationErrors.ts @@ -1,3 +1,4 @@ +import type { UnexpectedResourceError } from "../../../requester/results/error/ErrorResult"; import { ResourceError } from "../../../requester/results/error/ErrorResult"; /** @@ -7,3 +8,28 @@ import { ResourceError } from "../../../requester/results/error/ErrorResult"; export class UnsupportedNotificationError extends ResourceError { readonly type = "unsupportedNotificationError" as const; } + +/** + * ============================================================================= + * CALLBACK ERRORS + * ============================================================================= + */ + +export type NotificationCallbackError = + | DisconnectedAttemptingReconnectError + | DisconnectedNotAttemptingReconnectError + | UnexpectedResourceError; + +/** + * Indicates that the socket has disconnected and is attempting to reconnect. + */ +export class DisconnectedAttemptingReconnectError extends ResourceError { + readonly type = "disconnectedAttemptingReconnectError" as const; +} + +/** + * Indicates that the socket has disconnected and is attempting to reconnect. + */ +export class DisconnectedNotAttemptingReconnectError extends ResourceError { + readonly type = "disconnectedNotAttemptingReconnectError" as const; +} diff --git a/packages/solid/src/resource/notifications/results/SubscribeToNotificationSuccess.ts b/packages/solid/src/resource/notifications/results/SubscribeToNotificationSuccess.ts index fd3cfcb..6a95443 100644 --- a/packages/solid/src/resource/notifications/results/SubscribeToNotificationSuccess.ts +++ b/packages/solid/src/resource/notifications/results/SubscribeToNotificationSuccess.ts @@ -5,4 +5,5 @@ import type { ResourceSuccess } from "../../../requester/results/success/Success */ export interface SubscribeToNotificationSuccess extends ResourceSuccess { type: "subscribeToNotificationSuccess"; + subscriptionId: string; } diff --git a/packages/solid/src/resource/notifications/results/UnsubscribeFromNotificationSuccess.ts b/packages/solid/src/resource/notifications/results/UnsubscribeFromNotificationSuccess.ts index 5aa97e7..0b4c90c 100644 --- a/packages/solid/src/resource/notifications/results/UnsubscribeFromNotificationSuccess.ts +++ b/packages/solid/src/resource/notifications/results/UnsubscribeFromNotificationSuccess.ts @@ -5,4 +5,5 @@ import type { ResourceSuccess } from "../../../requester/results/success/Success */ export interface UnsubscribeToNotificationSuccess extends ResourceSuccess { type: "unsubscribeFromNotificationSuccess"; + subscriptionId: string; } diff --git a/packages/solid/test/Integration.test.ts b/packages/solid/test/Integration.test.ts index 22b60e7..3442aef 100644 --- a/packages/solid/test/Integration.test.ts +++ b/packages/solid/test/Integration.test.ts @@ -175,7 +175,7 @@ describe("Integration", () => { app.stop(); process.env.JEST_WORKER_ID = previousJestId; process.env.NODE_ENV = previousNodeEnv; - const testDataPath = path.join(__dirname, "../data"); + const testDataPath = path.join(__dirname, "./data"); await fs.rm(testDataPath, { recursive: true, force: true }); }); @@ -2037,6 +2037,7 @@ describe("Integration", () => { const subscriptionResult = await resource.subscribeToNotifications(); expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess"); + if (subscriptionResult.type !== "subscribeToNotificationSuccess") return; expect(resource.isSubscribedToNotifications()).toBe(true); @@ -2060,7 +2061,9 @@ describe("Integration", () => { // Notification is not propogated after unsubscribe spidermanCallback.mockClear(); - const unsubscribeResponse = await resource.unsubscribeFromNotifications(); + const unsubscribeResponse = await resource.unsubscribeFromNotifications( + subscriptionResult.subscriptionId, + ); expect(unsubscribeResponse.type).toBe( "unsubscribeFromNotificationSuccess", ); @@ -2116,7 +2119,7 @@ describe("Integration", () => { expect(spidermanCallback).toHaveBeenCalledTimes(1); expect(containerCallback).toHaveBeenCalledTimes(1); - await resource.unsubscribeFromNotifications(); + await resource.unsubscribeFromAllNotifications(); }); it("handles notification when subscribed to a parent with a deleted child", async () => { @@ -2151,7 +2154,7 @@ describe("Integration", () => { expect(spidermanCallback).toHaveBeenCalledTimes(1); expect(containerCallback).toHaveBeenCalledTimes(1); - await testContainer.unsubscribeFromNotifications(); + await testContainer.unsubscribeFromAllNotifications(); }); it("handles notification when subscribed to a parent with an added child", async () => { @@ -2190,7 +2193,7 @@ describe("Integration", () => { expect(spidermanCallback).toHaveBeenCalledTimes(1); expect(containerCallback).toHaveBeenCalledTimes(1); - await testContainer.unsubscribeFromNotifications(); + await testContainer.unsubscribeFromAllNotifications(); }); it("returns an error when it cannot subscribe to a notification", async () => { @@ -2222,8 +2225,8 @@ describe("Integration", () => { it("causes no problems when unsubscribing when not subscribed", async () => { const resource = solidLdoDataset.getResource(SAMPLE_DATA_URI); - const result = await resource.unsubscribeFromNotifications(); - expect(result.type).toBe("unsubscribeFromNotificationSuccess"); + const result = await resource.unsubscribeFromAllNotifications(); + expect(result.length).toBe(0); }); }); }); diff --git a/packages/solid/test/Websocket2023NotificationSubscription.test.ts b/packages/solid/test/Websocket2023NotificationSubscription.test.ts index ddf92bb..9ddf0b5 100644 --- a/packages/solid/test/Websocket2023NotificationSubscription.test.ts +++ b/packages/solid/test/Websocket2023NotificationSubscription.test.ts @@ -7,14 +7,12 @@ import type { NotificationChannel } from "@solid-notifications/types"; describe("Websocket2023NotificationSubscription", () => { it("returns an error when websockets have an error", async () => { const WebSocketMock: WebSocket = {} as WebSocket; - const onErrorMock = jest.fn(); const subscription = new Websocket2023NotificationSubscription( new Leaf("https://example.com", { fetch, } as unknown as SolidLdoDatasetContext), () => {}, - onErrorMock, {} as unknown as SolidLdoDatasetContext, () => WebSocketMock, ); @@ -25,23 +23,19 @@ describe("Websocket2023NotificationSubscription", () => { WebSocketMock.onopen?.({} as Event); const subscriptionResult = await subPromise; - expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess"); + expect(subscriptionResult.type).toBe("success"); WebSocketMock.onerror?.({ error: new Error("Test Error") } as ErrorEvent); - - expect(onErrorMock).toHaveBeenCalled(); }); it("returns an error when websockets have an error at the beginning", async () => { const WebSocketMock: WebSocket = {} as WebSocket; - const onErrorMock = jest.fn(); const subscription = new Websocket2023NotificationSubscription( new Leaf("https://example.com", { fetch, } as unknown as SolidLdoDatasetContext), () => {}, - onErrorMock, {} as unknown as SolidLdoDatasetContext, () => WebSocketMock, );