Complete refactor for websocket

main
Jackson Morgan 8 months ago
parent e2589eb85f
commit eae933b7f5
  1. 49
      packages/solid/src/resource/Resource.ts
  2. 138
      packages/solid/src/resource/notifications/NotificationSubscription.ts
  3. 80
      packages/solid/src/resource/notifications/Websocket2023NotificationSubscription.ts
  4. 17
      packages/solid/src/resource/notifications/results/NotificationErrors.ts
  5. 9
      packages/solid/src/resource/notifications/results/SubscribeToNotificationSuccess.ts
  6. 9
      packages/solid/src/resource/notifications/results/UnsubscribeFromNotificationSuccess.ts
  7. 62
      packages/solid/test/Integration.test.ts
  8. 7
      packages/solid/test/Websocket2023NotificationSubscription.test.ts

@ -38,8 +38,7 @@ import type { LeafUri } from "../util/uriTypes";
import type { NoRootContainerError } from "../requester/results/error/NoRootContainerError";
import type {
NotificationSubscription,
SubscribeResult,
UnsubscribeResult,
SubscriptionCallbacks,
} from "./notifications/NotificationSubscription";
import { Websocket2023NotificationSubscription } from "./notifications/Websocket2023NotificationSubscription";
import type { NotificationMessage } from "./notifications/NotificationMessage";
@ -329,13 +328,9 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{
*
* @example
* ```typescript
* // Logs "undefined"
* console.log(resource.isPresent());
* const result = resource.read();
* if (!result.isError) {
* // True if the resource exists, false if it does not
* console.log(resource.isPresent());
* }
* await resource.subscribeToNotifications();
* // Logs "true"
* console.log(resource.isSubscribedToNotifications());
* ```
*/
isSubscribedToNotifications(): boolean {
@ -740,7 +735,7 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{
*
* @param onNotificationError - A callback function if there is an error
* with notifications.
* @returns SubscriptionResult
* @returns SubscriptionId: A string to use to unsubscribe
*
* @example
* ```typescript
@ -759,22 +754,20 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{
* );
*
* // Subscribe
* const subscriptionResult = await testContainer.subscribeToNotifications(
* const subscriptionId = await testContainer.subscribeToNotifications({
* // These are optional callbacks. A subscription will automatically keep
* // the dataset in sync. Use these callbacks for additional functionality.
* onNotification: (message) => console.log(message),
* onNotificationError: (err) => console.log(err.message)
* );
* // ... From there you can ait for a file to be changed on the Pod.
* });
* // ... From there you can wait for a file to be changed on the Pod.
*/
async subscribeToNotifications(
onNotification?: (message: NotificationMessage) => void,
onNotificationError?: (err: Error) => void,
): Promise<SubscribeResult> {
return await this.notificationSubscription.subscribeToNotifications({
onNotification,
onNotificationError,
});
callbacks?: SubscriptionCallbacks,
): Promise<string> {
return await this.notificationSubscription.subscribeToNotifications(
callbacks,
);
}
/**
@ -814,15 +807,11 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{
*
* @example
* ```typescript
* const subscriptionResult = await testContainer.subscribeToNotifications();
* await testContainer.unsubscribeFromNotifications(
* subscriptionResult.subscriptionId
* );
* const subscriptionId = await testContainer.subscribeToNotifications();
* await testContainer.unsubscribeFromNotifications(subscriptionId);
* ```
*/
async unsubscribeFromNotifications(
subscriptionId: string,
): Promise<UnsubscribeResult> {
async unsubscribeFromNotifications(subscriptionId: string): Promise<void> {
return this.notificationSubscription.unsubscribeFromNotification(
subscriptionId,
);
@ -836,12 +825,10 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{
* @example
* ```typescript
* const subscriptionResult = await testContainer.subscribeToNotifications();
* await testContainer.unsubscribeFromNotifications(
* subscriptionResult.subscriptionId
* );
* await testContainer.unsubscribeFromAllNotifications();
* ```
*/
async unsubscribeFromAllNotifications(): Promise<UnsubscribeResult[]> {
async unsubscribeFromAllNotifications(): Promise<void> {
return this.notificationSubscription.unsubscribeFromAllNotifications();
}
}

@ -1,31 +1,9 @@
import type { UnexpectedResourceError } from "../../requester/results/error/ErrorResult";
import type { SolidLdoDatasetContext } from "../../SolidLdoDatasetContext";
import type { Resource } from "../Resource";
import type { NotificationMessage } from "./NotificationMessage";
import type {
NotificationCallbackError,
UnsupportedNotificationError,
} from "./results/NotificationErrors";
import type { SubscribeToNotificationSuccess } from "./results/SubscribeToNotificationSuccess";
import type { UnsubscribeToNotificationSuccess } from "./results/UnsubscribeFromNotificationSuccess";
import type { NotificationCallbackError } from "./results/NotificationErrors";
import { v4 } from "uuid";
export type SubscribeResult =
| SubscribeToNotificationSuccess
| UnsupportedNotificationError
| UnexpectedResourceError;
export type UnsubscribeResult =
| UnsubscribeToNotificationSuccess
| UnexpectedResourceError;
export type OpenResult =
| { type: "success" }
| UnsupportedNotificationError
| UnexpectedResourceError;
export type CloseResult = { type: "success" } | UnexpectedResourceError;
export interface SubscriptionCallbacks {
onNotification?: (message: NotificationMessage) => void;
// TODO: make notification errors more specific
@ -57,74 +35,100 @@ export abstract class NotificationSubscription {
return this.isOpen;
}
protected onNotification(message: NotificationMessage): void {
this.parentSubscription(message);
Object.values(this.subscriptions).forEach(({ onNotification }) => {
onNotification?.(message);
});
}
protected onNotificationError(message: NotificationCallbackError): void {
Object.values(this.subscriptions).forEach(({ onNotificationError }) => {
onNotificationError?.(message);
});
if (message.type === "disconnectedNotAttemptingReconnectError") {
this.isOpen = false;
}
}
/**
* ===========================================================================
* PUBLIC
* ===========================================================================
*/
/**
* @internal
* subscribeToNotifications
*/
async subscribeToNotifications(
subscriptionCallbacks: SubscriptionCallbacks,
): Promise<SubscribeResult> {
subscriptionCallbacks?: SubscriptionCallbacks,
): Promise<string> {
const subscriptionId = v4();
this.subscriptions[subscriptionId] = subscriptionCallbacks ?? {};
if (!this.isOpen) {
const openResult = await this.open();
if (openResult.type !== "success") return openResult;
await this.open();
this.isOpen = true;
}
const subscriptionId = v4();
this.subscriptions[subscriptionId] = subscriptionCallbacks;
return {
isError: false,
type: "subscribeToNotificationSuccess",
uri: this.resource.uri,
subscriptionId,
};
return subscriptionId;
}
async unsubscribeFromNotification(
subscriptionId: string,
): Promise<UnsubscribeResult> {
delete this.subscriptions[subscriptionId];
if (Object.keys(this.subscriptions).length === 0) {
const closeResult = await this.close();
if (closeResult.type !== "success") return closeResult;
/**
* @internal
* unsubscribeFromNotification
*/
async unsubscribeFromNotification(subscriptionId: string): Promise<void> {
if (
!!this.subscriptions[subscriptionId] &&
Object.keys(this.subscriptions).length === 1
) {
await this.close();
this.isOpen = false;
}
return {
isError: false,
type: "unsubscribeFromNotificationSuccess",
uri: this.resource.uri,
subscriptionId,
};
delete this.subscriptions[subscriptionId];
}
async unsubscribeFromAllNotifications(): Promise<UnsubscribeResult[]> {
return Promise.all(
/**
* @internal
* unsubscribeFromAllNotifications
*/
async unsubscribeFromAllNotifications(): Promise<void> {
await Promise.all(
Object.keys(this.subscriptions).map((id) =>
this.unsubscribeFromNotification(id),
),
);
}
/**
* ===========================================================================
* HELPERS
* ===========================================================================
*/
/**
* @internal
* Opens the subscription
*/
protected abstract open(): Promise<OpenResult>;
protected abstract open(): Promise<void>;
/**
* @internal
* Closes the subscription
*/
protected abstract close(): Promise<CloseResult>;
protected abstract close(): Promise<void>;
/**
* ===========================================================================
* CALLBACKS
* ===========================================================================
*/
/**
* @internal
* onNotification
*/
protected onNotification(message: NotificationMessage): void {
this.parentSubscription(message);
Object.values(this.subscriptions).forEach(({ onNotification }) => {
onNotification?.(message);
});
}
/**
* @internal
* onNotificationError
*/
protected onNotificationError(message: NotificationCallbackError): void {
Object.values(this.subscriptions).forEach(({ onNotificationError }) => {
onNotificationError?.(message);
});
if (message.type === "disconnectedNotAttemptingReconnectError") {
this.isOpen = false;
}
}
}

@ -1,5 +1,4 @@
import { UnexpectedResourceError } from "../../requester/results/error/ErrorResult";
import type { CloseResult, OpenResult } from "./NotificationSubscription";
import { NotificationSubscription } from "./NotificationSubscription";
import { SubscriptionClient } from "@solid-notifications/subscription";
import { WebSocket } from "ws";
@ -31,7 +30,7 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript
// Whether or not the socket was manually closes
private isManualClose = false;
// Maximum number of attempts to reconnect
private maxReconnectAttempts = 10;
private maxReconnectAttempts = 6;
constructor(
resource: Resource,
@ -43,18 +42,24 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript
this.createWebsocket = createWebsocket ?? createWebsocketDefault;
}
async open(): Promise<OpenResult> {
async open(): Promise<void> {
try {
const notificationChannel = await this.discoverNotificationChannel();
return this.subscribeToWebsocket(notificationChannel);
await this.subscribeToWebsocket(notificationChannel);
} catch (err) {
if (
err instanceof Error &&
err.message.startsWith("Discovery did not succeed")
) {
return new UnsupportedNotificationError(this.resource.uri, err.message);
this.onNotificationError(
new UnsupportedNotificationError(this.resource.uri, err.message),
);
} else {
this.onNotificationError(
UnexpectedResourceError.fromThrown(this.resource.uri, err),
);
}
return UnexpectedResourceError.fromThrown(this.resource.uri, err);
this.onClose();
}
}
@ -68,40 +73,30 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript
public async subscribeToWebsocket(
notificationChannel: NotificationChannel,
): Promise<OpenResult> {
return new Promise<OpenResult>((resolve) => {
let didResolve = false;
this.socket = this.createWebsocket(
notificationChannel.receiveFrom as string,
);
): Promise<void> {
this.socket = this.createWebsocket(
notificationChannel.receiveFrom as string,
);
this.socket.onopen = () => {
this.reconnectAttempts = 0; // Reset attempts on successful connection
this.isManualClose = false; // Reset manual close flag
didResolve = true;
resolve({
type: "success",
});
};
this.socket.onopen = () => {
this.reconnectAttempts = 0; // Reset attempts on successful connection
this.isManualClose = false; // Reset manual close flag
};
this.socket.onmessage = (message) => {
const messageData = message.data.toString();
// TODO uncompliant Pod error on misformatted message
this.onNotification(JSON.parse(messageData) as NotificationMessage);
};
this.socket.onmessage = (message) => {
const messageData = message.data.toString();
// TODO uncompliant Pod error on misformatted message
this.onNotification(JSON.parse(messageData) as NotificationMessage);
};
this.socket.onclose = this.onClose.bind(this);
this.socket.onclose = this.onClose.bind(this);
this.socket.onerror = (err) => {
if (!didResolve) {
resolve(UnexpectedResourceError.fromThrown(this.resource.uri, err));
} else {
this.onNotificationError(
new UnexpectedResourceError(this.resource.uri, err.error),
);
}
};
});
this.socket.onerror = (err) => {
this.onNotificationError(
new UnexpectedResourceError(this.resource.uri, err.error),
);
};
return;
}
private onClose() {
@ -109,11 +104,9 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript
// Attempt to reconnect only if the disconnection was unintentional
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const backoffTime = Math.min(
this.reconnectInterval * this.reconnectAttempts,
30000,
); // Cap backoff at 30 seconds
setTimeout(this.open, backoffTime);
setTimeout(() => {
this.open();
}, this.reconnectInterval);
this.onNotificationError(
new DisconnectedAttemptingReconnectError(
this.resource.uri,
@ -131,11 +124,8 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript
}
}
protected async close(): Promise<CloseResult> {
protected async close(): Promise<void> {
this.socket?.terminate();
return {
type: "success",
};
}
}

@ -1,6 +1,12 @@
import type { UnexpectedResourceError } from "../../../requester/results/error/ErrorResult";
import { ResourceError } from "../../../requester/results/error/ErrorResult";
export type NotificationCallbackError =
| DisconnectedAttemptingReconnectError
| DisconnectedNotAttemptingReconnectError
| UnsupportedNotificationError
| UnexpectedResourceError;
/**
* Indicates that the requested method for receiving notifications is not
* supported by this Pod.
@ -9,17 +15,6 @@ export class UnsupportedNotificationError extends ResourceError {
readonly type = "unsupportedNotificationError" as const;
}
/**
* =============================================================================
* CALLBACK ERRORS
* =============================================================================
*/
export type NotificationCallbackError =
| DisconnectedAttemptingReconnectError
| DisconnectedNotAttemptingReconnectError
| UnexpectedResourceError;
/**
* Indicates that the socket has disconnected and is attempting to reconnect.
*/

@ -1,9 +0,0 @@
import type { ResourceSuccess } from "../../../requester/results/success/SuccessResult";
/**
* Returned when a notification has been successfully subscribed to for a resource
*/
export interface SubscribeToNotificationSuccess extends ResourceSuccess {
type: "subscribeToNotificationSuccess";
subscriptionId: string;
}

@ -1,9 +0,0 @@
import type { ResourceSuccess } from "../../../requester/results/success/SuccessResult";
/**
* Returned when a notification has been successfully unsubscribed from for a resource
*/
export interface UnsubscribeToNotificationSuccess extends ResourceSuccess {
type: "unsubscribeFromNotificationSuccess";
subscriptionId: string;
}

@ -2035,9 +2035,7 @@ describe("Integration", () => {
spidermanCallback,
);
const subscriptionResult = await resource.subscribeToNotifications();
expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess");
if (subscriptionResult.type !== "subscribeToNotificationSuccess") return;
const subscriptionId = await resource.subscribeToNotifications();
expect(resource.isSubscribedToNotifications()).toBe(true);
@ -2061,12 +2059,7 @@ describe("Integration", () => {
// Notification is not propogated after unsubscribe
spidermanCallback.mockClear();
const unsubscribeResponse = await resource.unsubscribeFromNotifications(
subscriptionResult.subscriptionId,
);
expect(unsubscribeResponse.type).toBe(
"unsubscribeFromNotificationSuccess",
);
await resource.unsubscribeFromNotifications(subscriptionId);
expect(resource.isSubscribedToNotifications()).toBe(false);
await authFetch(SAMPLE_DATA_URI, {
method: "PATCH",
@ -2104,8 +2097,7 @@ describe("Integration", () => {
containerCallback,
);
const subscriptionResult = await resource.subscribeToNotifications();
expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess");
await resource.subscribeToNotifications();
await authFetch(SAMPLE_DATA_URI, {
method: "DELETE",
@ -2139,8 +2131,7 @@ describe("Integration", () => {
containerCallback,
);
const subscriptionResult = await testContainer.subscribeToNotifications();
expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess");
await testContainer.subscribeToNotifications();
await authFetch(SAMPLE_DATA_URI, {
method: "DELETE",
@ -2174,8 +2165,7 @@ describe("Integration", () => {
containerCallback,
);
const subscriptionResult = await testContainer.subscribeToNotifications();
expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess");
await testContainer.subscribeToNotifications();
await authFetch(TEST_CONTAINER_URI, {
method: "POST",
@ -2198,17 +2188,34 @@ describe("Integration", () => {
it("returns an error when it cannot subscribe to a notification", async () => {
const resource = solidLdoDataset.getResource(SAMPLE_DATA_URI);
const onError = jest.fn();
await app.stop();
await resource.subscribeToNotifications({ onNotificationError: onError });
expect(onError).toHaveBeenCalledTimes(2);
await app.start();
});
const subscriptionResult = await resource.subscribeToNotifications();
expect(subscriptionResult.type).toBe("unexpectedResourceError");
it("returns an error when the server doesnt support websockets", async () => {
const resource = solidLdoDataset.getResource(SAMPLE_DATA_URI);
const onError = jest.fn();
await app.stop();
const disabledWebsocketsApp = await createApp(
path.join(__dirname, "./configs/server-config-without-websocket.json"),
);
await disabledWebsocketsApp.start();
await resource.subscribeToNotifications({ onNotificationError: onError });
expect(onError).toHaveBeenCalledTimes(2);
await disabledWebsocketsApp.stop();
await app.start();
});
it("returns an error when the server doesnt support websockets", async () => {
it("attempts to reconnect multiple times before giving up.", async () => {
const resource = solidLdoDataset.getResource(SAMPLE_DATA_URI);
const onError = jest.fn();
await app.stop();
const disabledWebsocketsApp = await createApp(
@ -2216,8 +2223,19 @@ describe("Integration", () => {
);
await disabledWebsocketsApp.start();
const subscriptionResult = await resource.subscribeToNotifications();
expect(subscriptionResult.type).toBe("unsupportedNotificationError");
await resource.subscribeToNotifications({ onNotificationError: onError });
// TODO: This is a bad test because of the wait. Instead inject better
// numbers into the websocket class.
await wait(35000);
expect(onError).toHaveBeenCalledTimes(14);
expect(onError.mock.calls[1][0].type).toBe(
"disconnectedAttemptingReconnectError",
);
expect(onError.mock.calls[13][0].type).toBe(
"disconnectedNotAttemptingReconnectError",
);
await disabledWebsocketsApp.stop();
await app.start();
@ -2225,8 +2243,8 @@ describe("Integration", () => {
it("causes no problems when unsubscribing when not subscribed", async () => {
const resource = solidLdoDataset.getResource(SAMPLE_DATA_URI);
const result = await resource.unsubscribeFromAllNotifications();
expect(result.length).toBe(0);
await resource.unsubscribeFromAllNotifications();
expect(resource.isSubscribedToNotifications()).toBe(false);
});
});
});

@ -22,8 +22,7 @@ describe("Websocket2023NotificationSubscription", () => {
} as unknown as NotificationChannel);
WebSocketMock.onopen?.({} as Event);
const subscriptionResult = await subPromise;
expect(subscriptionResult.type).toBe("success");
await subPromise;
WebSocketMock.onerror?.({ error: new Error("Test Error") } as ErrorEvent);
});
@ -44,8 +43,6 @@ describe("Websocket2023NotificationSubscription", () => {
receiveFrom: "http://example.com",
} as unknown as NotificationChannel);
WebSocketMock.onerror?.({ error: new Error("Test Error") } as ErrorEvent);
const subscriptionResult = await subPromise;
expect(subscriptionResult.type).toBe("unexpectedResourceError");
await subPromise;
});
});

Loading…
Cancel
Save