diff --git a/README.md b/README.md index 45b8ff8..0f59ebf 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT) [![Build - CI](https://github.com/shutterstock/p-map-iterable/actions/workflows/ci.yml/badge.svg)](https://github.com/shutterstock/p-map-iterable/actions/workflows/ci.yml) [![Package and Publish](https://github.com/shutterstock/p-map-iterable/actions/workflows/publish.yml/badge.svg)](https://github.com/shutterstock/p-map-iterable/actions/workflows/publish.yml) +[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT) [![API Docs](https://img.shields.io/badge/API%20Docs-View%20Here-blue)](https://tech.shutterstock.com/p-map-iterable/) [![Build - CI](https://github.com/shutterstock/p-map-iterable/actions/workflows/ci.yml/badge.svg)](https://github.com/shutterstock/p-map-iterable/actions/workflows/ci.yml) [![Package and Publish](https://github.com/shutterstock/p-map-iterable/actions/workflows/publish.yml/badge.svg)](https://github.com/shutterstock/p-map-iterable/actions/workflows/publish.yml) [![Publish Docs](https://github.com/shutterstock/p-map-iterable/actions/workflows/docs.yml/badge.svg)](https://github.com/shutterstock/p-map-iterable/actions/workflows/docs.yml) # Overview @@ -21,12 +21,24 @@ These classes will typically be helpful in batch or queue consumers, not as much - We can also use `IterableQueueMapper.enqueue()` to write the files back to S3 without waiting for them to finish, unless we get more than, say, 3-4 files being uploaded at once, at which point we can pause until the current file is uploaded before we allow queuing another file for upload - In the rare case that 3-4 files are uploading, but not yet finished, we would block on `.enqueue()` and not consume much CPU while waiting for at least 1 upload to finish -# Installation +# Getting Started + +## Installation The package is available on npm as [@shutterstock/p-map-iterable](https://www.npmjs.com/package/@shutterstock/p-map-iterable) `npm i @shutterstock/p-map-iterable` +## Importing + +```typescript +import { IterableMapper, IterableQueueMapper, IterableQueueMapperSimple } from '@shutterstock/p-map-iterable'; +``` + +## API Documentation + +After installing the package, you might want to look at our [API Documentation](https://tech.shutterstock.com/p-map-iterable/) to learn about all the features available. + # `p-map-iterable` vs `p-map` vs `p-queue` These diagrams illustrate the differences in operation betweeen `p-map`, `p-queue`, and `p-map-iterable`. @@ -45,28 +57,28 @@ These diagrams illustrate the differences in operation betweeen `p-map`, `p-queu # Features -- `IterableMapper` +- [IterableMapper](https://tech.shutterstock.com/p-map-iterable/classes/IterableMapper.html) - Interface and concept based on: [p-map](https://github.com/sindresorhus/p-map) - Allows a sync or async iterable input - User supplied sync or async mapper function - Exposes an async iterable interface for consuming mapped items - Allows a maximum queue depth of mapped items - if the consumer stops consuming, the queue will fill up, at which point the mapper will stop being invoked until an item is consumed from the queue - This allows mapping with back pressure so that the mapper does not consume unlimited resources (e.g. memory, disk, network, event loop time) by racing ahead of the consumer -- `IterableQueueMapper` +- [IterableQueueMapper](https://tech.shutterstock.com/p-map-iterable/classes/IterableQueueMapper.html) - Wraps `IterableMapper` - Adds items to the queue via the `enqueue` method -- `IterableQueueMapperSimple` +- [IterableQueueMapperSimple](https://tech.shutterstock.com/p-map-iterable/classes/IterableQueueMapperSimple.html) - Wraps `IterableQueueMapper` - Discards results as they become available - Exposes any accumulated errors through the `errors` property instead of throwing an `AggregateError` - Not actually `Iterable` - May rename this before 1.0.0 ## Lower Level Utilities -- `IterableQueue` +- [IterableQueue](https://tech.shutterstock.com/p-map-iterable/classes/IterableQueue.html) - Lower level utility class - Wraps `BlockingQueue` - Exposes an async iterable interface for consuming items in the queue -- `BlockingQueue` +- [BlockingQueue](https://tech.shutterstock.com/p-map-iterable/classes/BlockingQueue.html) - Lower level utility class - `dequeue` blocks until an item is available or until all items have been removed, then returns `undefined` - `enqueue` blocks if the queue is full diff --git a/examples/iterable-queue-mapper-simple.ts b/examples/iterable-queue-mapper-simple.ts index caab26e..7a6ae79 100644 --- a/examples/iterable-queue-mapper-simple.ts +++ b/examples/iterable-queue-mapper-simple.ts @@ -72,8 +72,10 @@ async function main() { // Check for errors if (prefetcher.errors.length > 0) { console.error('Errors:'); - prefetcher.errors.forEach((error) => - console.error((error as Error).message ? (error as Error).message : error), + prefetcher.errors.forEach(({ error, item }) => + console.error( + `${item} had error: ${(error as Error).message ? (error as Error).message : error}`, + ), ); } diff --git a/src/iterable-mapper.ts b/src/iterable-mapper.ts index 4eca6cf..485c8c1 100644 --- a/src/iterable-mapper.ts +++ b/src/iterable-mapper.ts @@ -63,7 +63,7 @@ type NewElementOrError = { * * @remarks * - * Essentially - This allows performing a concurrent mapping with + * This allows performing a concurrent mapping with * back pressure (won't iterate all source items if the consumer is * not reading). * @@ -93,7 +93,21 @@ export class IterableMapper implements AsyncIterable { expect(backgroundWriter.isIdle).toBe(true); expect(backgroundWriter.errors.length).toBe(5); - expect(backgroundWriter.errors[0]).toBeInstanceOf(Error); - expect((backgroundWriter.errors[0] as Error).message).toBe('stop this now'); + expect(backgroundWriter.errors[0].error).toBeInstanceOf(Error); + expect((backgroundWriter.errors[0].error as Error).message).toBe('stop this now'); // Show that double onIdle() does not hang or cause an error await backgroundWriter.onIdle(); diff --git a/src/iterable-queue-mapper-simple.ts b/src/iterable-queue-mapper-simple.ts index 4769b1b..f0f4ff0 100644 --- a/src/iterable-queue-mapper-simple.ts +++ b/src/iterable-queue-mapper-simple.ts @@ -2,11 +2,22 @@ import { Mapper } from './iterable-mapper'; import { IterableQueueMapper } from './iterable-queue-mapper'; // eslint-disable-next-line @typescript-eslint/no-explicit-any -type Errors = (string | { [key: string]: any } | Error)[]; +type Errors = { item: T; error: string | { [key: string]: any } | Error }[]; const NoResult = Symbol('noresult'); /** + * Accepts queue items via `enqueue` and calls the `mapper` on them + * with specified concurrency, storing the + * `mapper` result in a queue of specified max size, before + * being iterated / read by the caller. The `enqueue` method will block if + * the queue is full, until an item is read. + * + * @remarks + * + * Note: the name is somewhat of a misnomer as this wraps `IterableQueueMapper` + * but is not itself an `Iterable`. + * * Accepts items for mapping in the background, discards the results, * but accumulates exceptions in the `errors` property. * @@ -17,7 +28,7 @@ const NoResult = Symbol('noresult'); */ export class IterableQueueMapperSimple { private readonly _writer: IterableQueueMapper; - private readonly _errors: Errors = []; + private readonly _errors: Errors = []; private readonly _done: Promise; private readonly _mapper: Mapper; private _isIdle = false; @@ -25,7 +36,16 @@ export class IterableQueueMapperSimple { /** * Create a new `IterableQueueMapperSimple` * - * @param mapper Function which is called for every item in `input`. Expected to return a `Promise` or value. + * @param mapper Function which is called for every item in `input`. + * Expected to return a `Promise` or value. + * + * The `mapper` *should* handle all errors and not allow an error to be thrown + * out of the `mapper` function as this enables the best handling of errors + * closest to the time that they occur. + * + * If the `mapper` function does allow an error to be thrown then the + * errors will be accumulated in the `errors` property. + * @param options IterableQueueMapperSimple options */ constructor( mapper: Mapper, @@ -62,15 +82,24 @@ export class IterableQueueMapperSimple { await this._mapper(item, index); // eslint-disable-next-line @typescript-eslint/no-explicit-any } catch (error: any) { - this._errors.push(error); + this._errors.push({ item, error }); } return NoResult; } /** * Accumulated errors from background `mappers`s + * + * @remarks + * + * Note that this property can be periodically checked + * during processing and errors can be `.pop()`'d off of the array + * and logged / handled as desired. Errors `.pop()`'d off of the array + * will no longer be available in the array on the next check. + * + * @returns Reference to the errors array */ - public get errors(): Errors { + public get errors(): Errors { return this._errors; } diff --git a/src/iterable-queue-mapper.ts b/src/iterable-queue-mapper.ts index 6ea4467..1e38918 100644 --- a/src/iterable-queue-mapper.ts +++ b/src/iterable-queue-mapper.ts @@ -32,20 +32,29 @@ export interface IterableQueueMapperOptions { } /** - * Iterates over a source iterable with specified concurrency, - * calling the `mapper` on each iterated item, and storing the + * Accepts queue items via `enqueue` and calls the `mapper` on them + * with specified concurrency, storing the * `mapper` result in a queue of specified max size, before - * being iterated / read by the caller. + * being iterated / read by the caller. The `enqueue` method will block if + * the queue is full, until an item is read. * * @remarks * - * Essentially - This allows performing a concurrent mapping with - * back pressure (won't iterate all source items if the consumer is - * not reading). + * This allows performing a concurrent mapping with + * back pressure for items added after queue creation + * via a method call. * - * Typical use case is for a `prefetcher` that ensures that items - * are always ready for the consumer but that large numbers of items - * are not processed before the consumer is ready for them. + * Because items are added via a method call it is possible to + * chain an `IterableMapper` that prefetches files and processes them, + * with an `IterableQueueMapper` that processes the results of the + * `mapper` function of the `IterableMapper`. + * + * Typical use case is for a `background uploader` that prevents + * the producer from racing ahead of the upload process, consuming + * too much memory or disk space. As items are ready for upload + * they are added to the queue with the `enqueue` method, which is + * `await`ed by the caller. If the queue has room then `enqueue` + * will return immediately, otherwise it will block until there is room. * * @category Enqueue Input */ @@ -57,7 +66,21 @@ export class IterableQueueMapper implements AsyncIterable, options: IterableQueueMapperOptions = {}) {