Before refactor to remove return from open and close

main
Jackson Morgan 8 months ago
parent 6915dcadce
commit e2589eb85f
  1. 0
      packages/solid-type-index/src/react/useProfile.ts
  2. 0
      packages/solid-type-index/src/react/useType.ts
  3. 31
      packages/solid-type-index/src/react/useTypeIndex.ts
  4. 10
      packages/solid-type-index/src/react/useTypeIndexProfile.ts
  5. 72
      packages/solid/src/resource/Resource.ts
  6. 101
      packages/solid/src/resource/notifications/NotificationSubscription.ts
  7. 94
      packages/solid/src/resource/notifications/Websocket2023NotificationSubscription.ts
  8. 26
      packages/solid/src/resource/notifications/results/NotificationErrors.ts
  9. 1
      packages/solid/src/resource/notifications/results/SubscribeToNotificationSuccess.ts
  10. 1
      packages/solid/src/resource/notifications/results/UnsubscribeFromNotificationSuccess.ts
  11. 17
      packages/solid/test/Integration.test.ts
  12. 8
      packages/solid/test/Websocket2023NotificationSubscription.test.ts

@ -0,0 +1,31 @@
import type { LeafUri } from "@ldo/solid";
import { useTypeIndexProfile } from "./useTypeIndexProfile";
import { useEffect, useMemo } from "react";
import { useLdo } from "@ldo/solid-react";
export function useTypeIndex(classUri: string): Promise<LeafUri[]> {
const { dataset } = useLdo();
const profile = useTypeIndexProfile();
const typeIndexUris: string[] = useMemo(() => {
const uris: string[] = [];
profile?.privateTypeIndex?.forEach((indexNode) => {
uris.push(indexNode["@id"]);
});
profile?.publicTypeIndex?.forEach((indexNode) => {
uris.push(indexNode["@id"]);
});
}, [profile]);
useEffect(() => {
const resources = typeIndexUris.map((uri) => dataset.getResource(uri));
resources.forEach((resource) => {
resource.readIfUnfetched();
resource.subscribeToNotifications();
});
return () => {
resources.forEach((resource) => resource.unsubscribeFromNotifications());
}
}, [typeIndexUris]);
}

@ -0,0 +1,10 @@
import { useResource, useSolidAuth, useSubject } from "@ldo/solid-react";
import type { TypeIndexProfile } from "../.ldo/profile.typings";
import { TypeIndexProfileShapeType } from "../.ldo/profile.shapeTypes";
export function useTypeIndexProfile(): TypeIndexProfile | undefined {
const { session } = useSolidAuth();
useResource(session.webId, { subscribe: true });
const profile = useSubject(TypeIndexProfileShapeType, session.webId);
return profile;
}

@ -37,9 +37,9 @@ import { setWacRuleForAclUri, type SetWacRuleResult } from "./wac/setWacRule";
import type { LeafUri } from "../util/uriTypes";
import type { NoRootContainerError } from "../requester/results/error/NoRootContainerError";
import type {
CloseSubscriptionResult,
NotificationSubscription,
OpenSubscriptionResult,
SubscribeResult,
UnsubscribeResult,
} from "./notifications/NotificationSubscription";
import { Websocket2023NotificationSubscription } from "./notifications/Websocket2023NotificationSubscription";
import type { NotificationMessage } from "./notifications/NotificationMessage";
@ -111,7 +111,7 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{
* @internal
* Handles notification subscriptions
*/
protected notificationSubscription?: NotificationSubscription;
protected notificationSubscription: NotificationSubscription;
/**
* @param context - SolidLdoDatasetContext for the parent dataset
@ -119,6 +119,11 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{
constructor(context: SolidLdoDatasetContext) {
super();
this.context = context;
this.notificationSubscription = new Websocket2023NotificationSubscription(
this,
this.onNotification.bind(this),
this.context,
);
}
/**
@ -334,7 +339,7 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{
* ```
*/
isSubscribedToNotifications(): boolean {
return !!this.notificationSubscription;
return this.notificationSubscription.isSubscribedToNotifications();
}
/**
@ -735,7 +740,7 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{
*
* @param onNotificationError - A callback function if there is an error
* with notifications.
* @returns OpenSubscriptionResult
* @returns SubscriptionResult
*
* @example
* ```typescript
@ -754,19 +759,22 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{
* );
*
* // Subscribe
* const subscriptionResult = await testContainer.subscribeToNotifications();
* const subscriptionResult = await testContainer.subscribeToNotifications(
* // These are optional callbacks. A subscription will automatically keep
* // the dataset in sync. Use these callbacks for additional functionality.
* onNotification: (message) => console.log(message),
* onNotificationError: (err) => console.log(err.message)
* );
* // ... From there you can ait for a file to be changed on the Pod.
*/
async subscribeToNotifications(
onNotification?: (message: NotificationMessage) => void,
onNotificationError?: (err: Error) => void,
): Promise<OpenSubscriptionResult> {
this.notificationSubscription = new Websocket2023NotificationSubscription(
this,
this.onNotification.bind(this),
): Promise<SubscribeResult> {
return await this.notificationSubscription.subscribeToNotifications({
onNotification,
onNotificationError,
this.context,
);
return await this.notificationSubscription.open();
});
}
/**
@ -802,22 +810,38 @@ export abstract class Resource extends (EventEmitter as new () => TypedEmitter<{
/**
* Unsubscribes from changes made to this resource on the Pod
*
* @returns CloseSubscriptionResult
* @returns UnsubscribeResult
*
* @example
* ```typescript
* resource.unsubscribeFromNotifications()
* const subscriptionResult = await testContainer.subscribeToNotifications();
* await testContainer.unsubscribeFromNotifications(
* subscriptionResult.subscriptionId
* );
* ```
*/
async unsubscribeFromNotifications(): Promise<CloseSubscriptionResult> {
const result = await this.notificationSubscription?.close();
this.notificationSubscription = undefined;
return (
result ?? {
type: "unsubscribeFromNotificationSuccess",
isError: false,
uri: this.uri,
}
async unsubscribeFromNotifications(
subscriptionId: string,
): Promise<UnsubscribeResult> {
return this.notificationSubscription.unsubscribeFromNotification(
subscriptionId,
);
}
/**
* Unsubscribes from all notifications on this resource
*
* @returns UnsubscribeResult[]
*
* @example
* ```typescript
* const subscriptionResult = await testContainer.subscribeToNotifications();
* await testContainer.unsubscribeFromNotifications(
* subscriptionResult.subscriptionId
* );
* ```
*/
async unsubscribeFromAllNotifications(): Promise<UnsubscribeResult[]> {
return this.notificationSubscription.unsubscribeFromAllNotifications();
}
}

@ -2,50 +2,129 @@ import type { UnexpectedResourceError } from "../../requester/results/error/Erro
import type { SolidLdoDatasetContext } from "../../SolidLdoDatasetContext";
import type { Resource } from "../Resource";
import type { NotificationMessage } from "./NotificationMessage";
import type { UnsupportedNotificationError } from "./results/NotificationErrors";
import type {
NotificationCallbackError,
UnsupportedNotificationError,
} from "./results/NotificationErrors";
import type { SubscribeToNotificationSuccess } from "./results/SubscribeToNotificationSuccess";
import type { UnsubscribeToNotificationSuccess } from "./results/UnsubscribeFromNotificationSuccess";
import { v4 } from "uuid";
export type OpenSubscriptionResult =
export type SubscribeResult =
| SubscribeToNotificationSuccess
| UnsupportedNotificationError
| UnexpectedResourceError;
export type CloseSubscriptionResult =
export type UnsubscribeResult =
| UnsubscribeToNotificationSuccess
| UnexpectedResourceError;
export type OpenResult =
| { type: "success" }
| UnsupportedNotificationError
| UnexpectedResourceError;
export type CloseResult = { type: "success" } | UnexpectedResourceError;
export interface SubscriptionCallbacks {
onNotification?: (message: NotificationMessage) => void;
// TODO: make notification errors more specific
onNotificationError?: (error: Error) => void;
}
/**
* @internal
* Abstract class for notification subscription methods.
*/
export abstract class NotificationSubscription {
protected resource: Resource;
protected onNotification: (message: NotificationMessage) => void;
protected onError?: (err: Error) => void;
protected parentSubscription: (message: NotificationMessage) => void;
protected context: SolidLdoDatasetContext;
protected subscriptions: Record<string, SubscriptionCallbacks> = {};
protected isOpen: boolean = false;
constructor(
resource: Resource,
onNotification: (message: NotificationMessage) => void,
onError: ((err: Error) => void) | undefined,
parentSubscription: (message: NotificationMessage) => void,
context: SolidLdoDatasetContext,
) {
this.resource = resource;
this.onNotification = onNotification;
this.onError = onError;
this.parentSubscription = parentSubscription;
this.context = context;
}
public isSubscribedToNotifications(): boolean {
return this.isOpen;
}
protected onNotification(message: NotificationMessage): void {
this.parentSubscription(message);
Object.values(this.subscriptions).forEach(({ onNotification }) => {
onNotification?.(message);
});
}
protected onNotificationError(message: NotificationCallbackError): void {
Object.values(this.subscriptions).forEach(({ onNotificationError }) => {
onNotificationError?.(message);
});
if (message.type === "disconnectedNotAttemptingReconnectError") {
this.isOpen = false;
}
}
async subscribeToNotifications(
subscriptionCallbacks: SubscriptionCallbacks,
): Promise<SubscribeResult> {
if (!this.isOpen) {
const openResult = await this.open();
if (openResult.type !== "success") return openResult;
this.isOpen = true;
}
const subscriptionId = v4();
this.subscriptions[subscriptionId] = subscriptionCallbacks;
return {
isError: false,
type: "subscribeToNotificationSuccess",
uri: this.resource.uri,
subscriptionId,
};
}
async unsubscribeFromNotification(
subscriptionId: string,
): Promise<UnsubscribeResult> {
delete this.subscriptions[subscriptionId];
if (Object.keys(this.subscriptions).length === 0) {
const closeResult = await this.close();
if (closeResult.type !== "success") return closeResult;
this.isOpen = false;
}
return {
isError: false,
type: "unsubscribeFromNotificationSuccess",
uri: this.resource.uri,
subscriptionId,
};
}
async unsubscribeFromAllNotifications(): Promise<UnsubscribeResult[]> {
return Promise.all(
Object.keys(this.subscriptions).map((id) =>
this.unsubscribeFromNotification(id),
),
);
}
/**
* @internal
* Opens the subscription
*/
abstract open(): Promise<OpenSubscriptionResult>;
protected abstract open(): Promise<OpenResult>;
/**
* @internal
* Closes the subscription
*/
abstract close(): Promise<CloseSubscriptionResult>;
protected abstract close(): Promise<CloseResult>;
}

@ -1,12 +1,13 @@
import { UnexpectedResourceError } from "../../requester/results/error/ErrorResult";
import type {
CloseSubscriptionResult,
OpenSubscriptionResult,
} from "./NotificationSubscription";
import type { CloseResult, OpenResult } from "./NotificationSubscription";
import { NotificationSubscription } from "./NotificationSubscription";
import { SubscriptionClient } from "@solid-notifications/subscription";
import { WebSocket } from "ws";
import { UnsupportedNotificationError } from "./results/NotificationErrors";
import {
DisconnectedAttemptingReconnectError,
DisconnectedNotAttemptingReconnectError,
UnsupportedNotificationError,
} from "./results/NotificationErrors";
import type { NotificationMessage } from "./NotificationMessage";
import type { Resource } from "../Resource";
import type { SolidLdoDatasetContext } from "../../SolidLdoDatasetContext";
@ -22,18 +23,27 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript
private socket: WebSocket | undefined;
private createWebsocket: (address: string) => WebSocket;
// Reconnection data
// How often we should attempt a reconnection
private reconnectInterval = 5000;
// How many attempts have already been tried for a reconnection
private reconnectAttempts = 0;
// Whether or not the socket was manually closes
private isManualClose = false;
// Maximum number of attempts to reconnect
private maxReconnectAttempts = 10;
constructor(
resource: Resource,
onNotification: (message: NotificationMessage) => void,
onError: ((err: Error) => void) | undefined,
parentSubscription: (message: NotificationMessage) => void,
context: SolidLdoDatasetContext,
createWebsocket?: (address: string) => WebSocket,
) {
super(resource, onNotification, onError, context);
super(resource, parentSubscription, context);
this.createWebsocket = createWebsocket ?? createWebsocketDefault;
}
async open(): Promise<OpenSubscriptionResult> {
async open(): Promise<OpenResult> {
try {
const notificationChannel = await this.discoverNotificationChannel();
return this.subscribeToWebsocket(notificationChannel);
@ -48,7 +58,7 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript
}
}
async discoverNotificationChannel(): Promise<NotificationChannel> {
public async discoverNotificationChannel(): Promise<NotificationChannel> {
const client = new SubscriptionClient(this.context.fetch);
return await client.subscribe(
this.resource.uri,
@ -56,43 +66,75 @@ export class Websocket2023NotificationSubscription extends NotificationSubscript
);
}
async subscribeToWebsocket(
public async subscribeToWebsocket(
notificationChannel: NotificationChannel,
): Promise<OpenSubscriptionResult> {
return new Promise<OpenSubscriptionResult>((resolve) => {
): Promise<OpenResult> {
return new Promise<OpenResult>((resolve) => {
let didResolve = false;
this.socket = this.createWebsocket(
notificationChannel.receiveFrom as string,
);
this.socket.onopen = () => {
this.reconnectAttempts = 0; // Reset attempts on successful connection
this.isManualClose = false; // Reset manual close flag
didResolve = true;
resolve({
type: "success",
});
};
this.socket.onmessage = (message) => {
const messageData = message.data.toString();
// TODO uncompliant Pod error on misformatted message
this.onNotification(JSON.parse(messageData) as NotificationMessage);
};
this.socket.onclose = this.onClose.bind(this);
this.socket.onerror = (err) => {
if (!didResolve) {
resolve(UnexpectedResourceError.fromThrown(this.resource.uri, err));
} else {
this.onError?.(err.error);
this.onNotificationError(
new UnexpectedResourceError(this.resource.uri, err.error),
);
}
};
this.socket.onopen = () => {
didResolve = true;
resolve({
isError: false,
type: "subscribeToNotificationSuccess",
uri: this.resource.uri,
});
};
});
}
async close(): Promise<CloseSubscriptionResult> {
private onClose() {
if (!this.isManualClose) {
// Attempt to reconnect only if the disconnection was unintentional
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const backoffTime = Math.min(
this.reconnectInterval * this.reconnectAttempts,
30000,
); // Cap backoff at 30 seconds
setTimeout(this.open, backoffTime);
this.onNotificationError(
new DisconnectedAttemptingReconnectError(
this.resource.uri,
`Attempting to reconnect to Websocket for ${this.resource.uri}.`,
),
);
} else {
this.onNotificationError(
new DisconnectedNotAttemptingReconnectError(
this.resource.uri,
`Lost connection to websocket for ${this.resource.uri}.`,
),
);
}
}
}
protected async close(): Promise<CloseResult> {
this.socket?.terminate();
return {
type: "unsubscribeFromNotificationSuccess",
isError: false,
uri: this.resource.uri,
type: "success",
};
}
}

@ -1,3 +1,4 @@
import type { UnexpectedResourceError } from "../../../requester/results/error/ErrorResult";
import { ResourceError } from "../../../requester/results/error/ErrorResult";
/**
@ -7,3 +8,28 @@ import { ResourceError } from "../../../requester/results/error/ErrorResult";
export class UnsupportedNotificationError extends ResourceError {
readonly type = "unsupportedNotificationError" as const;
}
/**
* =============================================================================
* CALLBACK ERRORS
* =============================================================================
*/
export type NotificationCallbackError =
| DisconnectedAttemptingReconnectError
| DisconnectedNotAttemptingReconnectError
| UnexpectedResourceError;
/**
* Indicates that the socket has disconnected and is attempting to reconnect.
*/
export class DisconnectedAttemptingReconnectError extends ResourceError {
readonly type = "disconnectedAttemptingReconnectError" as const;
}
/**
* Indicates that the socket has disconnected and is attempting to reconnect.
*/
export class DisconnectedNotAttemptingReconnectError extends ResourceError {
readonly type = "disconnectedNotAttemptingReconnectError" as const;
}

@ -5,4 +5,5 @@ import type { ResourceSuccess } from "../../../requester/results/success/Success
*/
export interface SubscribeToNotificationSuccess extends ResourceSuccess {
type: "subscribeToNotificationSuccess";
subscriptionId: string;
}

@ -5,4 +5,5 @@ import type { ResourceSuccess } from "../../../requester/results/success/Success
*/
export interface UnsubscribeToNotificationSuccess extends ResourceSuccess {
type: "unsubscribeFromNotificationSuccess";
subscriptionId: string;
}

@ -175,7 +175,7 @@ describe("Integration", () => {
app.stop();
process.env.JEST_WORKER_ID = previousJestId;
process.env.NODE_ENV = previousNodeEnv;
const testDataPath = path.join(__dirname, "../data");
const testDataPath = path.join(__dirname, "./data");
await fs.rm(testDataPath, { recursive: true, force: true });
});
@ -2037,6 +2037,7 @@ describe("Integration", () => {
const subscriptionResult = await resource.subscribeToNotifications();
expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess");
if (subscriptionResult.type !== "subscribeToNotificationSuccess") return;
expect(resource.isSubscribedToNotifications()).toBe(true);
@ -2060,7 +2061,9 @@ describe("Integration", () => {
// Notification is not propogated after unsubscribe
spidermanCallback.mockClear();
const unsubscribeResponse = await resource.unsubscribeFromNotifications();
const unsubscribeResponse = await resource.unsubscribeFromNotifications(
subscriptionResult.subscriptionId,
);
expect(unsubscribeResponse.type).toBe(
"unsubscribeFromNotificationSuccess",
);
@ -2116,7 +2119,7 @@ describe("Integration", () => {
expect(spidermanCallback).toHaveBeenCalledTimes(1);
expect(containerCallback).toHaveBeenCalledTimes(1);
await resource.unsubscribeFromNotifications();
await resource.unsubscribeFromAllNotifications();
});
it("handles notification when subscribed to a parent with a deleted child", async () => {
@ -2151,7 +2154,7 @@ describe("Integration", () => {
expect(spidermanCallback).toHaveBeenCalledTimes(1);
expect(containerCallback).toHaveBeenCalledTimes(1);
await testContainer.unsubscribeFromNotifications();
await testContainer.unsubscribeFromAllNotifications();
});
it("handles notification when subscribed to a parent with an added child", async () => {
@ -2190,7 +2193,7 @@ describe("Integration", () => {
expect(spidermanCallback).toHaveBeenCalledTimes(1);
expect(containerCallback).toHaveBeenCalledTimes(1);
await testContainer.unsubscribeFromNotifications();
await testContainer.unsubscribeFromAllNotifications();
});
it("returns an error when it cannot subscribe to a notification", async () => {
@ -2222,8 +2225,8 @@ describe("Integration", () => {
it("causes no problems when unsubscribing when not subscribed", async () => {
const resource = solidLdoDataset.getResource(SAMPLE_DATA_URI);
const result = await resource.unsubscribeFromNotifications();
expect(result.type).toBe("unsubscribeFromNotificationSuccess");
const result = await resource.unsubscribeFromAllNotifications();
expect(result.length).toBe(0);
});
});
});

@ -7,14 +7,12 @@ import type { NotificationChannel } from "@solid-notifications/types";
describe("Websocket2023NotificationSubscription", () => {
it("returns an error when websockets have an error", async () => {
const WebSocketMock: WebSocket = {} as WebSocket;
const onErrorMock = jest.fn();
const subscription = new Websocket2023NotificationSubscription(
new Leaf("https://example.com", {
fetch,
} as unknown as SolidLdoDatasetContext),
() => {},
onErrorMock,
{} as unknown as SolidLdoDatasetContext,
() => WebSocketMock,
);
@ -25,23 +23,19 @@ describe("Websocket2023NotificationSubscription", () => {
WebSocketMock.onopen?.({} as Event);
const subscriptionResult = await subPromise;
expect(subscriptionResult.type).toBe("subscribeToNotificationSuccess");
expect(subscriptionResult.type).toBe("success");
WebSocketMock.onerror?.({ error: new Error("Test Error") } as ErrorEvent);
expect(onErrorMock).toHaveBeenCalled();
});
it("returns an error when websockets have an error at the beginning", async () => {
const WebSocketMock: WebSocket = {} as WebSocket;
const onErrorMock = jest.fn();
const subscription = new Websocket2023NotificationSubscription(
new Leaf("https://example.com", {
fetch,
} as unknown as SolidLdoDatasetContext),
() => {},
onErrorMock,
{} as unknown as SolidLdoDatasetContext,
() => WebSocketMock,
);

Loading…
Cancel
Save