From eae933b7f592bafc34833cdf89a8b27af1c80932 Mon Sep 17 00:00:00 2001 From: Jackson Morgan Date: Sun, 12 Jan 2025 20:37:51 -0500 Subject: [PATCH] Complete refactor for websocket --- packages/solid/src/resource/Resource.ts | 49 +++---- .../notifications/NotificationSubscription.ts | 138 +++++++++--------- .../Websocket2023NotificationSubscription.ts | 80 +++++----- .../results/NotificationErrors.ts | 17 +-- .../results/SubscribeToNotificationSuccess.ts | 9 -- .../UnsubscribeFromNotificationSuccess.ts | 9 -- packages/solid/test/Integration.test.ts | 62 +++++--- ...socket2023NotificationSubscription.test.ts | 7 +- 8 files changed, 172 insertions(+), 199 deletions(-) delete mode 100644 packages/solid/src/resource/notifications/results/SubscribeToNotificationSuccess.ts delete mode 100644 packages/solid/src/resource/notifications/results/UnsubscribeFromNotificationSuccess.ts diff --git a/packages/solid/src/resource/Resource.ts b/packages/solid/src/resource/Resource.ts index c21dc89..79c6e1c 100644 --- a/packages/solid/src/resource/Resource.ts +++ b/packages/solid/src/resource/Resource.ts @@ -38,8 +38,7 @@ import type { LeafUri } from "../util/uriTypes"; import type { NoRootContainerError } from "../requester/results/error/NoRootContainerError"; import type { NotificationSubscription, - SubscribeResult, - UnsubscribeResult, + SubscriptionCallbacks, } from "./notifications/NotificationSubscription"; import { Websocket2023NotificationSubscription } from "./notifications/Websocket2023NotificationSubscription"; import type { NotificationMessage } from "./notifications/NotificationMessage"; @@ -329,13 +328,9 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{ * * @example * ```typescript - * // Logs "undefined" - * console.log(resource.isPresent()); - * const result = resource.read(); - * if (!result.isError) { - * // True if the resource exists, false if it does not - * console.log(resource.isPresent()); - * } + * await resource.subscribeToNotifications(); + * // Logs "true" + * console.log(resource.isSubscribedToNotifications()); * ``` */ isSubscribedToNotifications(): boolean { @@ -740,7 +735,7 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{ * * @param onNotificationError - A callback function if there is an error * with notifications. - * @returns SubscriptionResult + * @returns SubscriptionId: A string to use to unsubscribe * * @example * ```typescript @@ -759,22 +754,20 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{ * ); * * // Subscribe - * const subscriptionResult = await testContainer.subscribeToNotifications( + * const subscriptionId = await testContainer.subscribeToNotifications({ * // These are optional callbacks. A subscription will automatically keep * // the dataset in sync. Use these callbacks for additional functionality. * onNotification: (message) => console.log(message), * onNotificationError: (err) => console.log(err.message) - * ); - * // ... From there you can ait for a file to be changed on the Pod. + * }); + * // ... From there you can wait for a file to be changed on the Pod. */ async subscribeToNotifications( - onNotification?: (message: NotificationMessage) => void, - onNotificationError?: (err: Error) => void, - ): Promise { - return await this.notificationSubscription.subscribeToNotifications({ - onNotification, - onNotificationError, - }); + callbacks?: SubscriptionCallbacks, + ): Promise { + return await this.notificationSubscription.subscribeToNotifications( + callbacks, + ); } /** @@ -814,15 +807,11 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{ * * @example * ```typescript - * const subscriptionResult = await testContainer.subscribeToNotifications(); - * await testContainer.unsubscribeFromNotifications( - * subscriptionResult.subscriptionId - * ); + * const subscriptionId = await testContainer.subscribeToNotifications(); + * await testContainer.unsubscribeFromNotifications(subscriptionId); * ``` */ - async unsubscribeFromNotifications( - subscriptionId: string, - ): Promise { + async unsubscribeFromNotifications(subscriptionId: string): Promise { return this.notificationSubscription.unsubscribeFromNotification( subscriptionId, ); @@ -836,12 +825,10 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{ * @example * ```typescript * const subscriptionResult = await testContainer.subscribeToNotifications(); - * await testContainer.unsubscribeFromNotifications( - * subscriptionResult.subscriptionId - * ); + * await testContainer.unsubscribeFromAllNotifications(); * ``` */ - async unsubscribeFromAllNotifications(): Promise { + async unsubscribeFromAllNotifications(): Promise { return this.notificationSubscription.unsubscribeFromAllNotifications(); } } diff --git a/packages/solid/src/resource/notifications/NotificationSubscription.ts b/packages/solid/src/resource/notifications/NotificationSubscription.ts index 1c9113e..9e7d020 100644 --- a/packages/solid/src/resource/notifications/NotificationSubscription.ts +++ b/packages/solid/src/resource/notifications/NotificationSubscription.ts @@ -1,31 +1,9 @@ -import type { UnexpectedResourceError } from "../../requester/results/error/ErrorResult"; import type { SolidLdoDatasetContext } from "../../SolidLdoDatasetContext"; import type { Resource } from "../Resource"; import type { NotificationMessage } from "./NotificationMessage"; -import type { - NotificationCallbackError, - UnsupportedNotificationError, -} from "./results/NotificationErrors"; -import type { SubscribeToNotificationSuccess } from "./results/SubscribeToNotificationSuccess"; -import type { UnsubscribeToNotificationSuccess } from "./results/UnsubscribeFromNotificationSuccess"; +import type { NotificationCallbackError } from "./results/NotificationErrors"; import { v4 } from "uuid"; -export type SubscribeResult = - | SubscribeToNotificationSuccess - | UnsupportedNotificationError - | UnexpectedResourceError; - -export type UnsubscribeResult = - | UnsubscribeToNotificationSuccess - | UnexpectedResourceError; - -export type OpenResult = - | { type: "success" } - | UnsupportedNotificationError - | UnexpectedResourceError; - -export type CloseResult = { type: "success" } | UnexpectedResourceError; - export interface SubscriptionCallbacks { onNotification?: (message: NotificationMessage) => void; // TODO: make notification errors more specific @@ -57,74 +35,100 @@ export abstract class NotificationSubscription { return this.isOpen; } - protected onNotification(message: NotificationMessage): void { - this.parentSubscription(message); - Object.values(this.subscriptions).forEach(({ onNotification }) => { - onNotification?.(message); - }); - } - - protected onNotificationError(message: NotificationCallbackError): void { - Object.values(this.subscriptions).forEach(({ onNotificationError }) => { - onNotificationError?.(message); - }); - if (message.type === "disconnectedNotAttemptingReconnectError") { - this.isOpen = false; - } - } + /** + * =========================================================================== + * PUBLIC + * =========================================================================== + */ + /** + * @internal + * subscribeToNotifications + */ async subscribeToNotifications( - subscriptionCallbacks: SubscriptionCallbacks, - ): Promise { + subscriptionCallbacks?: SubscriptionCallbacks, + ): Promise { + const subscriptionId = v4(); + this.subscriptions[subscriptionId] = subscriptionCallbacks ?? {}; if (!this.isOpen) { - const openResult = await this.open(); - if (openResult.type !== "success") return openResult; + await this.open(); this.isOpen = true; } - const subscriptionId = v4(); - this.subscriptions[subscriptionId] = subscriptionCallbacks; - return { - isError: false, - type: "subscribeToNotificationSuccess", - uri: this.resource.uri, - subscriptionId, - }; + return subscriptionId; } - async unsubscribeFromNotification( - subscriptionId: string, - ): Promise { - delete this.subscriptions[subscriptionId]; - if (Object.keys(this.subscriptions).length === 0) { - const closeResult = await this.close(); - if (closeResult.type !== "success") return closeResult; + /** + * @internal + * unsubscribeFromNotification + */ + async unsubscribeFromNotification(subscriptionId: string): Promise { + if ( + !!this.subscriptions[subscriptionId] && + Object.keys(this.subscriptions).length === 1 + ) { + await this.close(); this.isOpen = false; } - return { - isError: false, - type: "unsubscribeFromNotificationSuccess", - uri: this.resource.uri, - subscriptionId, - }; + delete this.subscriptions[subscriptionId]; } - async unsubscribeFromAllNotifications(): Promise { - return Promise.all( + /** + * @internal + * unsubscribeFromAllNotifications + */ + async unsubscribeFromAllNotifications(): Promise { + await Promise.all( Object.keys(this.subscriptions).map((id) => this.unsubscribeFromNotification(id), ), ); } + /** + * =========================================================================== + * HELPERS + * =========================================================================== + */ + /** * @internal * Opens the subscription */ - protected abstract open(): Promise; + protected abstract open(): Promise; /** * @internal * Closes the subscription */ - protected abstract close(): Promise; + protected abstract close(): Promise; + + /** + * =========================================================================== + * CALLBACKS + * =========================================================================== + */ + + /** + * @internal + * onNotification + */ + protected onNotification(message: NotificationMessage): void { + this.parentSubscription(message); + Object.values(this.subscriptions).forEach(({ onNotification }) => { + onNotification?.(message); + }); + } + + /** + * @internal + * onNotificationError + */ + protected onNotificationError(message: NotificationCallbackError): void { + Object.values(this.subscriptions).forEach(({ onNotificationError }) => { + onNotificationError?.(message); + }); + if (message.type === "disconnectedNotAttemptingReconnectError") { + this.isOpen = false; + } + } } diff --git a/packages/solid/src/resource/notifications/Websocket2023NotificationSubscription.ts b/packages/solid/src/resource/notifications/Websocket2023NotificationSubscription.ts index 420bd6e..80c51cd 100644 --- a/packages/solid/src/resource/notifications/Websocket2023NotificationSubscription.ts +++ b/packages/solid/src/resource/notifications/Websocket2023NotificationSubscription.ts @@ -1,5 +1,4 @@ import { UnexpectedResourceError } from "../../requester/results/error/ErrorResult"; -import type { CloseResult, OpenResult } from "./NotificationSubscription"; import { NotificationSubscription } from "./NotificationSubscription"; import { SubscriptionClient } from "@solid-notifications/subscription"; import { WebSocket } from "ws"; @@ -31,7 +30,7 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript // Whether or not the socket was manually closes private isManualClose = false; // Maximum number of attempts to reconnect - private maxReconnectAttempts = 10; + private maxReconnectAttempts = 6; constructor( resource: Resource, @@ -43,18 +42,24 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript this.createWebsocket = createWebsocket ?? createWebsocketDefault; } - async open(): Promise { + async open(): Promise { try { const notificationChannel = await this.discoverNotificationChannel(); - return this.subscribeToWebsocket(notificationChannel); + await this.subscribeToWebsocket(notificationChannel); } catch (err) { if ( err instanceof Error && err.message.startsWith("Discovery did not succeed") ) { - return new UnsupportedNotificationError(this.resource.uri, err.message); + this.onNotificationError( + new UnsupportedNotificationError(this.resource.uri, err.message), + ); + } else { + this.onNotificationError( + UnexpectedResourceError.fromThrown(this.resource.uri, err), + ); } - return UnexpectedResourceError.fromThrown(this.resource.uri, err); + this.onClose(); } } @@ -68,40 +73,30 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript public async subscribeToWebsocket( notificationChannel: NotificationChannel, - ): Promise { - return new Promise((resolve) => { - let didResolve = false; - this.socket = this.createWebsocket( - notificationChannel.receiveFrom as string, - ); + ): Promise { + this.socket = this.createWebsocket( + notificationChannel.receiveFrom as string, + ); - this.socket.onopen = () => { - this.reconnectAttempts = 0; // Reset attempts on successful connection - this.isManualClose = false; // Reset manual close flag - didResolve = true; - resolve({ - type: "success", - }); - }; + this.socket.onopen = () => { + this.reconnectAttempts = 0; // Reset attempts on successful connection + this.isManualClose = false; // Reset manual close flag + }; - this.socket.onmessage = (message) => { - const messageData = message.data.toString(); - // TODO uncompliant Pod error on misformatted message - this.onNotification(JSON.parse(messageData) as NotificationMessage); - }; + this.socket.onmessage = (message) => { + const messageData = message.data.toString(); + // TODO uncompliant Pod error on misformatted message + this.onNotification(JSON.parse(messageData) as NotificationMessage); + }; - this.socket.onclose = this.onClose.bind(this); + this.socket.onclose = this.onClose.bind(this); - this.socket.onerror = (err) => { - if (!didResolve) { - resolve(UnexpectedResourceError.fromThrown(this.resource.uri, err)); - } else { - this.onNotificationError( - new UnexpectedResourceError(this.resource.uri, err.error), - ); - } - }; - }); + this.socket.onerror = (err) => { + this.onNotificationError( + new UnexpectedResourceError(this.resource.uri, err.error), + ); + }; + return; } private onClose() { @@ -109,11 +104,9 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript // Attempt to reconnect only if the disconnection was unintentional if (this.reconnectAttempts < this.maxReconnectAttempts) { this.reconnectAttempts++; - const backoffTime = Math.min( - this.reconnectInterval * this.reconnectAttempts, - 30000, - ); // Cap backoff at 30 seconds - setTimeout(this.open, backoffTime); + setTimeout(() => { + this.open(); + }, this.reconnectInterval); this.onNotificationError( new DisconnectedAttemptingReconnectError( this.resource.uri, @@ -131,11 +124,8 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript } } - protected async close(): Promise { + protected async close(): Promise { this.socket?.terminate(); - return { - type: "success", - }; } } diff --git a/packages/solid/src/resource/notifications/results/NotificationErrors.ts b/packages/solid/src/resource/notifications/results/NotificationErrors.ts index dcde3bb..f196c86 100644 --- a/packages/solid/src/resource/notifications/results/NotificationErrors.ts +++ b/packages/solid/src/resource/notifications/results/NotificationErrors.ts @@ -1,6 +1,12 @@ import type { UnexpectedResourceError } from "../../../requester/results/error/ErrorResult"; import { ResourceError } from "../../../requester/results/error/ErrorResult"; +export type NotificationCallbackError = + | DisconnectedAttemptingReconnectError + | DisconnectedNotAttemptingReconnectError + | UnsupportedNotificationError + | UnexpectedResourceError; + /** * Indicates that the requested method for receiving notifications is not * supported by this Pod. @@ -9,17 +15,6 @@ export class UnsupportedNotificationError extends ResourceError { readonly type = "unsupportedNotificationError" as const; } -/** - * ============================================================================= - * CALLBACK ERRORS - * ============================================================================= - */ - -export type NotificationCallbackError = - | DisconnectedAttemptingReconnectError - | DisconnectedNotAttemptingReconnectError - | UnexpectedResourceError; - /** * Indicates that the socket has disconnected and is attempting to reconnect. */ diff --git a/packages/solid/src/resource/notifications/results/SubscribeToNotificationSuccess.ts b/packages/solid/src/resource/notifications/results/SubscribeToNotificationSuccess.ts deleted file mode 100644 index 6a95443..0000000 --- a/packages/solid/src/resource/notifications/results/SubscribeToNotificationSuccess.ts +++ /dev/null @@ -1,9 +0,0 @@ -import type { ResourceSuccess } from "../../../requester/results/success/SuccessResult"; - -/** - * Returned when a notification has been successfully subscribed to for a resource - */ -export interface SubscribeToNotificationSuccess extends ResourceSuccess { - type: "subscribeToNotificationSuccess"; - subscriptionId: string; -} diff --git a/packages/solid/src/resource/notifications/results/UnsubscribeFromNotificationSuccess.ts b/packages/solid/src/resource/notifications/results/UnsubscribeFromNotificationSuccess.ts deleted file mode 100644 index 0b4c90c..0000000 --- a/packages/solid/src/resource/notifications/results/UnsubscribeFromNotificationSuccess.ts +++ /dev/null @@ -1,9 +0,0 @@ -import type { ResourceSuccess } from "../../../requester/results/success/SuccessResult"; - -/** - * Returned when a notification has been successfully unsubscribed from for a resource - */ -export interface UnsubscribeToNotificationSuccess extends ResourceSuccess { - type: "unsubscribeFromNotificationSuccess"; - subscriptionId: string; -} diff --git a/packages/solid/test/Integration.test.ts b/packages/solid/test/Integration.test.ts index 3442aef..eda134b 100644 --- a/packages/solid/test/Integration.test.ts +++ b/packages/solid/test/Integration.test.ts @@ -2035,9 +2035,7 @@ describe("Integration", () => { spidermanCallback, ); - const subscriptionResult = await resource.subscribeToNotifications(); - expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess"); - if (subscriptionResult.type !== "subscribeToNotificationSuccess") return; + const subscriptionId = await resource.subscribeToNotifications(); expect(resource.isSubscribedToNotifications()).toBe(true); @@ -2061,12 +2059,7 @@ describe("Integration", () => { // Notification is not propogated after unsubscribe spidermanCallback.mockClear(); - const unsubscribeResponse = await resource.unsubscribeFromNotifications( - subscriptionResult.subscriptionId, - ); - expect(unsubscribeResponse.type).toBe( - "unsubscribeFromNotificationSuccess", - ); + await resource.unsubscribeFromNotifications(subscriptionId); expect(resource.isSubscribedToNotifications()).toBe(false); await authFetch(SAMPLE_DATA_URI, { method: "PATCH", @@ -2104,8 +2097,7 @@ describe("Integration", () => { containerCallback, ); - const subscriptionResult = await resource.subscribeToNotifications(); - expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess"); + await resource.subscribeToNotifications(); await authFetch(SAMPLE_DATA_URI, { method: "DELETE", @@ -2139,8 +2131,7 @@ describe("Integration", () => { containerCallback, ); - const subscriptionResult = await testContainer.subscribeToNotifications(); - expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess"); + await testContainer.subscribeToNotifications(); await authFetch(SAMPLE_DATA_URI, { method: "DELETE", @@ -2174,8 +2165,7 @@ describe("Integration", () => { containerCallback, ); - const subscriptionResult = await testContainer.subscribeToNotifications(); - expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess"); + await testContainer.subscribeToNotifications(); await authFetch(TEST_CONTAINER_URI, { method: "POST", @@ -2198,17 +2188,34 @@ describe("Integration", () => { it("returns an error when it cannot subscribe to a notification", async () => { const resource = solidLdoDataset.getResource(SAMPLE_DATA_URI); + const onError = jest.fn(); await app.stop(); + await resource.subscribeToNotifications({ onNotificationError: onError }); + expect(onError).toHaveBeenCalledTimes(2); + await app.start(); + }); - const subscriptionResult = await resource.subscribeToNotifications(); - expect(subscriptionResult.type).toBe("unexpectedResourceError"); + it("returns an error when the server doesnt support websockets", async () => { + const resource = solidLdoDataset.getResource(SAMPLE_DATA_URI); + const onError = jest.fn(); + await app.stop(); + const disabledWebsocketsApp = await createApp( + path.join(__dirname, "./configs/server-config-without-websocket.json"), + ); + await disabledWebsocketsApp.start(); + + await resource.subscribeToNotifications({ onNotificationError: onError }); + expect(onError).toHaveBeenCalledTimes(2); + + await disabledWebsocketsApp.stop(); await app.start(); }); - it("returns an error when the server doesnt support websockets", async () => { + it("attempts to reconnect multiple times before giving up.", async () => { const resource = solidLdoDataset.getResource(SAMPLE_DATA_URI); + const onError = jest.fn(); await app.stop(); const disabledWebsocketsApp = await createApp( @@ -2216,8 +2223,19 @@ describe("Integration", () => { ); await disabledWebsocketsApp.start(); - const subscriptionResult = await resource.subscribeToNotifications(); - expect(subscriptionResult.type).toBe("unsupportedNotificationError"); + await resource.subscribeToNotifications({ onNotificationError: onError }); + + // TODO: This is a bad test because of the wait. Instead inject better + // numbers into the websocket class. + await wait(35000); + + expect(onError).toHaveBeenCalledTimes(14); + expect(onError.mock.calls[1][0].type).toBe( + "disconnectedAttemptingReconnectError", + ); + expect(onError.mock.calls[13][0].type).toBe( + "disconnectedNotAttemptingReconnectError", + ); await disabledWebsocketsApp.stop(); await app.start(); @@ -2225,8 +2243,8 @@ describe("Integration", () => { it("causes no problems when unsubscribing when not subscribed", async () => { const resource = solidLdoDataset.getResource(SAMPLE_DATA_URI); - const result = await resource.unsubscribeFromAllNotifications(); - expect(result.length).toBe(0); + await resource.unsubscribeFromAllNotifications(); + expect(resource.isSubscribedToNotifications()).toBe(false); }); }); }); diff --git a/packages/solid/test/Websocket2023NotificationSubscription.test.ts b/packages/solid/test/Websocket2023NotificationSubscription.test.ts index 9ddf0b5..1f97b0d 100644 --- a/packages/solid/test/Websocket2023NotificationSubscription.test.ts +++ b/packages/solid/test/Websocket2023NotificationSubscription.test.ts @@ -22,8 +22,7 @@ describe("Websocket2023NotificationSubscription", () => { } as unknown as NotificationChannel); WebSocketMock.onopen?.({} as Event); - const subscriptionResult = await subPromise; - expect(subscriptionResult.type).toBe("success"); + await subPromise; WebSocketMock.onerror?.({ error: new Error("Test Error") } as ErrorEvent); }); @@ -44,8 +43,6 @@ describe("Websocket2023NotificationSubscription", () => { receiveFrom: "http://example.com", } as unknown as NotificationChannel); WebSocketMock.onerror?.({ error: new Error("Test Error") } as ErrorEvent); - const subscriptionResult = await subPromise; - - expect(subscriptionResult.type).toBe("unexpectedResourceError"); + await subPromise; }); });