Skip to content

Commit

Permalink
fix: issue when payload is empty - not calling onDone
Browse files Browse the repository at this point in the history
  • Loading branch information
Avivbens committed Jan 1, 2024
1 parent 3399f4c commit 8198649
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 5 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,6 @@
"coverageDirectory": "../coverage",
"testEnvironment": "node",
"clearMocks": true,
"testTimeout": 10000
"testTimeout": 3000
}
}
44 changes: 44 additions & 0 deletions src/parallel/parallel.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,50 @@ describe('Parallel', () => {
})
})

it('should escape and execute onDone when payload is empty', (done) => {
const callFn = jest.fn()
const failFn = jest.fn()
const payload: any[] = []
const length = payload.length

Parallel.execute({
handler: async (item) => {
callFn(item)
return new Promise((resolve) => setTimeout(resolve, 5))
},
concurrency: 10,
payload,
onItemFail: failFn,
onDone: () => {
expect(callFn).toBeCalledTimes(length)
expect(failFn).toBeCalledTimes(0)
done()
},
})
})

it('should escape and execute onDone when payload is 1 object', (done) => {
const callFn = jest.fn()
const failFn = jest.fn()
const payload = [1]
const length = payload.length

Parallel.execute({
handler: async (item) => {
callFn(item)
return new Promise((resolve) => setTimeout(resolve, 5))
},
concurrency: 10,
payload,
onItemFail: failFn,
onDone: () => {
expect(callFn).toBeCalledTimes(length)
expect(failFn).toBeCalledTimes(0)
done()
},
})
})

it('should call onItemFail on promise reject', (done) => {
const callFn = jest.fn()
const failFn = jest.fn()
Expand Down
17 changes: 13 additions & 4 deletions src/parallel/parallel.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import { DEFAULT_EXECUTION_OPTIONS } from './default'

export class Parallel {
/**
* @description Executes handler for each item in payload
* @param options - execution options
* @returns subscription of the execution
*
* T - type of payload item
* K - type of handler return value
*/
Expand All @@ -15,20 +19,25 @@ export class Parallel {
IExecutionOptions<T, K>
>({}, DEFAULT_EXECUTION_OPTIONS, options)

const limitedConcurrency: number = Math.min(payload.length, concurrency)

const firstItems =
processDirection === 'fifo'
? payload.splice(0, concurrency)
: payload.splice(payload.length - <number>concurrency, concurrency)
? payload.splice(0, limitedConcurrency)
: payload.splice(payload.length - limitedConcurrency, limitedConcurrency)

const executions$: Subscription = new Subscription()
const isDoneChecker$ = new Subject<boolean>()

const skipCount: number = concurrency > payload.length ? payload.length : concurrency
if (limitedConcurrency === 0) {
onDone?.()
return executions$
}

isDoneChecker$
.pipe(
filter((isDone) => isDone),
skip(<number>skipCount - 1),
skip(limitedConcurrency - 1),
take(1),
switchMap(async () => onDone?.()),
)
Expand Down

0 comments on commit 8198649

Please sign in to comment.