Skip to content

Commit

Permalink
Don't await batches
Browse files Browse the repository at this point in the history
  • Loading branch information
ejscribner committed Nov 22, 2024
1 parent 4ceeda0 commit dd616ed
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions src/handler/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,18 @@ import { ManyQueryResponse, ManyResponse, StatusExecution } from './types';
* @ignore
*/
export const chunkArray = (list, size) => {
const clonedList = [...list];
const results: any = [];
while (list.length) {
results.push(list.splice(0, size));
while (clonedList.length) {
results.push(clonedList.splice(0, size));
}
return results;
};

/**
* @ignore
*/
function* processBatch(items, fn, metadata, extra, options): IterableIterator<StatusExecution> {
function* processBatch(items, fn, metadata, extra, options): IterableIterator<Promise<StatusExecution>> {
const clonedItems = [...items];
for (const items of clonedItems) {
yield fn(items, metadata, extra, options)
Expand All @@ -31,14 +32,18 @@ export const batchProcessQueue =
<T = any>(metadata: ModelMetadata) =>
async (items: unknown[], fn: unknown, extra: Record<string, unknown> = {}, options: any = {}, throttle = 100) => {
const chunks = chunkArray([...items], throttle);
const chunkPromises = chunks.map((data) => Promise.resolve(data));
const result: ManyResponse<T> = { success: 0, match_number: items.length, errors: [], data: [] };
for await (const chunk of chunkPromises) {
for await (const r of processBatch(chunk, fn, metadata, extra, options)) {
for (const chunk of chunks) {
const promises: Promise<StatusExecution>[] = [];
for (const promise of processBatch(chunk, fn, metadata, extra, options)) {
promises.push(promise);
}
const batchResults = await Promise.all(promises);
for (const r of batchResults) {
if (r.status === 'FAILURE') {
result.errors.push(r);
} else {
result.success = result.success + 1;
result.success += 1;
if (r.payload) {
result.data?.push(r.payload as T);
}
Expand Down

0 comments on commit dd616ed

Please sign in to comment.