Fixed DoubleFetch

main
jaxoncreed 2 years ago
parent c0c8eb6afb
commit a9e93671f6
  1. 14
      packages/solid/src/requester/ContainerRequester.ts
  2. 24
      packages/solid/src/requester/LeafRequester.ts
  3. 38
      packages/solid/src/requester/Requester.ts
  4. 66
      packages/solid/src/util/RequestBatcher.ts
  5. 1
      packages/solid/tsconfig.build.json

@ -45,12 +45,16 @@ export class ContainerRequester extends Requester {
name: IS_ROOT_CONTAINER_KEY, name: IS_ROOT_CONTAINER_KEY,
args: [this.uri as ContainerUri, { fetch: this.context.fetch }], args: [this.uri as ContainerUri, { fetch: this.context.fetch }],
perform: checkRootContainer, perform: checkRootContainer,
modifyQueue: (queue, isLoading) => { modifyQueue: (queue, currentlyLoading) => {
if (queue.length === 0) { if (
return isLoading[IS_ROOT_CONTAINER_KEY]; queue.length === 0 &&
} else { currentlyLoading?.name === IS_ROOT_CONTAINER_KEY
return queue[queue.length - 1].name === IS_ROOT_CONTAINER_KEY; ) {
return currentlyLoading;
} else if (queue[queue.length - 1]?.name === IS_ROOT_CONTAINER_KEY) {
return queue[queue.length - 1];
} }
return undefined;
}, },
}); });
} }

@ -70,14 +70,14 @@ export class LeafRequester extends Requester {
{ fetch: this.context.fetch, onRollback: () => transaction.rollback() }, { fetch: this.context.fetch, onRollback: () => transaction.rollback() },
], ],
perform: updateDataResource, perform: updateDataResource,
modifyQueue: (queue, isLoading, [, changes]) => { modifyQueue: (queue, currentlyProcessing, [, changes]) => {
if (queue[queue.length - 1].name === UPDATE_KEY) { if (queue[queue.length - 1]?.name === UPDATE_KEY) {
// Merge Changes // Merge Changes
const originalChanges = queue[queue.length - 1].args[1]; const originalChanges = queue[queue.length - 1].args[1];
mergeDatasetChanges(originalChanges, changes); mergeDatasetChanges(originalChanges, changes);
return true; return queue[queue.length - 1];
} }
return false; return undefined;
}, },
}); });
return result; return result;
@ -120,13 +120,23 @@ export class LeafRequester extends Requester {
{ dataset: transaction, fetch: this.context.fetch }, { dataset: transaction, fetch: this.context.fetch },
], ],
perform: uploadResource, perform: uploadResource,
modifyQueue: (queue, isLoading, args) => { modifyQueue: (queue, currentlyLoading, args) => {
const lastElementInQueue = queue[queue.length - 1]; const lastElementInQueue = queue[queue.length - 1];
return ( if (
lastElementInQueue && lastElementInQueue &&
lastElementInQueue.name === UPLOAD_KEY && lastElementInQueue.name === UPLOAD_KEY &&
!!lastElementInQueue.args[3] === !!args[3] !!lastElementInQueue.args[3] === !!args[3]
); ) {
return lastElementInQueue;
}
if (
currentlyLoading &&
currentlyLoading.name === UPLOAD_KEY &&
!!currentlyLoading.args[3] === !!args[3]
) {
return currentlyLoading;
}
return undefined;
}, },
}); });
if (!result.isError) { if (!result.isError) {

@ -52,12 +52,13 @@ export abstract class Requester {
name: READ_KEY, name: READ_KEY,
args: [this.uri, { dataset: transaction, fetch: this.context.fetch }], args: [this.uri, { dataset: transaction, fetch: this.context.fetch }],
perform: readResource, perform: readResource,
modifyQueue: (queue, isLoading) => { modifyQueue: (queue, currentlyLoading) => {
if (queue.length === 0) { if (queue.length === 0 && currentlyLoading?.name === READ_KEY) {
return isLoading[READ_KEY]; return currentlyLoading;
} else { } else if (queue[queue.length - 1]?.name === READ_KEY) {
return queue[queue.length - 1].name === READ_KEY; return queue[queue.length - 1];
} }
return undefined;
}, },
}); });
if (!result.isError) { if (!result.isError) {
@ -75,12 +76,13 @@ export abstract class Requester {
name: DELETE_KEY, name: DELETE_KEY,
args: [this.uri, { dataset: transaction, fetch: this.context.fetch }], args: [this.uri, { dataset: transaction, fetch: this.context.fetch }],
perform: deleteResource, perform: deleteResource,
modifyQueue: (queue, isLoading) => { modifyQueue: (queue, currentlyLoading) => {
if (queue.length === 0) { if (queue.length === 0 && currentlyLoading?.name === DELETE_KEY) {
return isLoading[DELETE_KEY]; return currentlyLoading;
} else { } else if (queue[queue.length - 1]?.name === DELETE_KEY) {
return queue[queue.length - 1].name === DELETE_KEY; return queue[queue.length - 1];
} }
return undefined;
}, },
}); });
if (!result.isError) { if (!result.isError) {
@ -125,13 +127,23 @@ export abstract class Requester {
{ dataset: transaction, fetch: this.context.fetch }, { dataset: transaction, fetch: this.context.fetch },
], ],
perform: createDataResource, perform: createDataResource,
modifyQueue: (queue, isLoading, args) => { modifyQueue: (queue, currentlyLoading, args) => {
const lastElementInQueue = queue[queue.length - 1]; const lastElementInQueue = queue[queue.length - 1];
return ( if (
lastElementInQueue && lastElementInQueue &&
lastElementInQueue.name === CREATE_KEY && lastElementInQueue.name === CREATE_KEY &&
!!lastElementInQueue.args[1] === !!args[1] !!lastElementInQueue.args[1] === !!args[1]
); ) {
return lastElementInQueue;
}
if (
currentlyLoading &&
currentlyLoading.name === CREATE_KEY &&
!!currentlyLoading.args[1] === !!args[1]
) {
return currentlyLoading;
}
return undefined;
}, },
}); });
if (!result.isError) { if (!result.isError) {

@ -16,21 +16,24 @@ export interface WaitingProcessOptions<Args extends any[], Return> {
/** /**
* *
* @param processQueue The current process queue * @param processQueue The current process queue
* @param isLoading The current is loading * @param currentlyProcessing: The Process that is currently executing
* @param args provided args * @param args provided args
* @returns true if the process queue has been modified and a new process should not be added to the queue * @returns A WaitingProcess that this request should listen to, or undefined if it should create its own
*/ */
modifyQueue: ( modifyQueue: (
processQueue: WaitingProcess<any[], any>[], processQueue: WaitingProcess<any[], any>[],
isLoading: Record<string, boolean>, currentlyProcessing: WaitingProcess<any[], any> | undefined,
args: Args, args: Args,
) => boolean; ) => WaitingProcess<any[], any> | undefined;
} }
/**
* Request Batcher
*/
export class RequestBatcher { export class RequestBatcher {
private lastRequestTimestampMap: Record<string, number> = {}; private lastRequestTimestampMap: Record<string, number> = {};
private loadingMap: Record<string, boolean> = {}; private currentlyProcessing: WaitingProcess<any[], any> | undefined =
private isWaiting: boolean = false; undefined;
private processQueue: WaitingProcess<any[], any>[] = []; private processQueue: WaitingProcess<any[], any>[] = [];
public shouldBatchAllRequests: boolean; public shouldBatchAllRequests: boolean;
public batchMillis: number; public batchMillis: number;
@ -46,7 +49,7 @@ export class RequestBatcher {
} }
public isLoading(key: string): boolean { public isLoading(key: string): boolean {
return !!this.loadingMap[key]; return this.currentlyProcessing?.name === key;
} }
private triggerOrWaitProcess() { private triggerOrWaitProcess() {
@ -66,14 +69,14 @@ export class RequestBatcher {
const timeSinceLastTrigger = Date.now() - lastRequestTimestamp; const timeSinceLastTrigger = Date.now() - lastRequestTimestamp;
const triggerProcess = async () => { const triggerProcess = async () => {
if (this.isWaiting) { if (this.currentlyProcessing) {
return; return;
} }
this.lastRequestTimestampMap[processName] = Date.now(); this.lastRequestTimestampMap[processName] = Date.now();
this.lastRequestTimestampMap[ANY_KEY] = Date.now(); this.lastRequestTimestampMap[ANY_KEY] = Date.now();
const processToTrigger = this.processQueue.shift(); const processToTrigger = this.processQueue.shift();
if (processToTrigger) { if (processToTrigger) {
this.isWaiting = true; this.currentlyProcessing = processToTrigger;
try { try {
const returnValue = await processToTrigger.perform( const returnValue = await processToTrigger.perform(
...processToTrigger.args, ...processToTrigger.args,
@ -86,22 +89,9 @@ export class RequestBatcher {
callback(err); callback(err);
}); });
} }
this.isWaiting = false; this.currentlyProcessing = undefined;
// Reset loading this.triggerOrWaitProcess();
if (
!this.processQueue.some(
(process) => process.name === processToTrigger.name,
)
) {
this.loadingMap[processToTrigger.name] = false;
}
if (this.processQueue.length > 0) {
this.triggerOrWaitProcess();
} else {
this.loadingMap[ANY_KEY] = false;
}
} }
}; };
@ -116,22 +106,18 @@ export class RequestBatcher {
options: WaitingProcessOptions<Args, ReturnType>, options: WaitingProcessOptions<Args, ReturnType>,
): Promise<ReturnType> { ): Promise<ReturnType> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const lastProcessInQueue = const shouldAwait = options.modifyQueue(
this.processQueue[this.processQueue.length - 1]; this.processQueue,
if (lastProcessInQueue) { this.currentlyProcessing,
const didModifyLast = lastProcessInQueue options.args,
? options.modifyQueue( );
this.processQueue,
this.loadingMap, if (shouldAwait) {
options.args, shouldAwait.awaitingResolutions.push(resolve);
) shouldAwait.awaitingRejections.push(reject);
: false; return;
if (didModifyLast) {
lastProcessInQueue.awaitingResolutions.push(resolve);
lastProcessInQueue.awaitingRejections.push(reject);
return;
}
} }
const waitingProcess: WaitingProcess<Args, ReturnType> = { const waitingProcess: WaitingProcess<Args, ReturnType> = {
name: options.name, name: options.name,
args: options.args, args: options.args,
@ -143,8 +129,6 @@ export class RequestBatcher {
this.processQueue.push( this.processQueue.push(
waitingProcess as unknown as WaitingProcess<any[], any>, waitingProcess as unknown as WaitingProcess<any[], any>,
); );
this.loadingMap[waitingProcess.name] = true;
this.loadingMap[ANY_KEY] = true;
this.triggerOrWaitProcess(); this.triggerOrWaitProcess();
}); });
} }

@ -1,6 +1,7 @@
{ {
"extends": "../../tsconfig.base.json", "extends": "../../tsconfig.base.json",
"compilerOptions": { "compilerOptions": {
"strict": false,
"outDir": "./dist", "outDir": "./dist",
}, },
"include": ["./src"] "include": ["./src"]

Loading…
Cancel
Save