From dd616ed017570f8a78c294b85a53525f1839733a Mon Sep 17 00:00:00 2001 From: Elliot Scribner Date: Fri, 22 Nov 2024 11:45:17 -0800 Subject: [PATCH] Don't await batches --- src/handler/utils.ts | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/src/handler/utils.ts b/src/handler/utils.ts index 1f541641..c1c1228d 100644 --- a/src/handler/utils.ts +++ b/src/handler/utils.ts @@ -5,9 +5,10 @@ import { ManyQueryResponse, ManyResponse, StatusExecution } from './types'; * @ignore */ export const chunkArray = (list, size) => { + const clonedList = [...list]; const results: any = []; - while (list.length) { - results.push(list.splice(0, size)); + while (clonedList.length) { + results.push(clonedList.splice(0, size)); } return results; }; @@ -15,7 +16,7 @@ export const chunkArray = (list, size) => { /** * @ignore */ -function* processBatch(items, fn, metadata, extra, options): IterableIterator { +function* processBatch(items, fn, metadata, extra, options): IterableIterator> { const clonedItems = [...items]; for (const items of clonedItems) { yield fn(items, metadata, extra, options) @@ -31,14 +32,18 @@ export const batchProcessQueue = (metadata: ModelMetadata) => async (items: unknown[], fn: unknown, extra: Record = {}, options: any = {}, throttle = 100) => { const chunks = chunkArray([...items], throttle); - const chunkPromises = chunks.map((data) => Promise.resolve(data)); const result: ManyResponse = { success: 0, match_number: items.length, errors: [], data: [] }; - for await (const chunk of chunkPromises) { - for await (const r of processBatch(chunk, fn, metadata, extra, options)) { + for (const chunk of chunks) { + const promises: Promise[] = []; + for (const promise of processBatch(chunk, fn, metadata, extra, options)) { + promises.push(promise); + } + const batchResults = await Promise.all(promises); + for (const r of batchResults) { if (r.status === 'FAILURE') { result.errors.push(r); } else { - result.success = result.success + 1; + result.success += 1; if (r.payload) { result.data?.push(r.payload as T); }