Skip to content

Commit

Permalink
Doc improvements / extend errors property (#4)
Browse files Browse the repository at this point in the history
- Add API Docs badge to README
- Add link to API Docs from `Getting Started` section
- Fix incorrect class summaries
- Wrap `{ item, error}` into the `errors` property
  - This is an interface change... before anyone uses this
- Add guidance on handling exceptions from `mapper` functions
  • Loading branch information
huntharo authored Jun 16, 2023
1 parent a442cab commit 7b3c80f
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 28 deletions.
26 changes: 19 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT) [![Build - CI](https://github.com/shutterstock/p-map-iterable/actions/workflows/ci.yml/badge.svg)](https://github.com/shutterstock/p-map-iterable/actions/workflows/ci.yml) [![Package and Publish](https://github.com/shutterstock/p-map-iterable/actions/workflows/publish.yml/badge.svg)](https://github.com/shutterstock/p-map-iterable/actions/workflows/publish.yml)
[![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT) [![API Docs](https://img.shields.io/badge/API%20Docs-View%20Here-blue)](https://tech.shutterstock.com/p-map-iterable/) [![Build - CI](https://github.com/shutterstock/p-map-iterable/actions/workflows/ci.yml/badge.svg)](https://github.com/shutterstock/p-map-iterable/actions/workflows/ci.yml) [![Package and Publish](https://github.com/shutterstock/p-map-iterable/actions/workflows/publish.yml/badge.svg)](https://github.com/shutterstock/p-map-iterable/actions/workflows/publish.yml) [![Publish Docs](https://github.com/shutterstock/p-map-iterable/actions/workflows/docs.yml/badge.svg)](https://github.com/shutterstock/p-map-iterable/actions/workflows/docs.yml)

# Overview

Expand All @@ -21,12 +21,24 @@ These classes will typically be helpful in batch or queue consumers, not as much
- We can also use `IterableQueueMapper.enqueue()` to write the files back to S3 without waiting for them to finish, unless we get more than, say, 3-4 files being uploaded at once, at which point we can pause until the current file is uploaded before we allow queuing another file for upload
- In the rare case that 3-4 files are uploading, but not yet finished, we would block on `.enqueue()` and not consume much CPU while waiting for at least 1 upload to finish

# Installation
# Getting Started

## Installation

The package is available on npm as [@shutterstock/p-map-iterable](https://www.npmjs.com/package/@shutterstock/p-map-iterable)

`npm i @shutterstock/p-map-iterable`

## Importing

```typescript
import { IterableMapper, IterableQueueMapper, IterableQueueMapperSimple } from '@shutterstock/p-map-iterable';
```

## API Documentation

After installing the package, you might want to look at our [API Documentation](https://tech.shutterstock.com/p-map-iterable/) to learn about all the features available.

# `p-map-iterable` vs `p-map` vs `p-queue`

These diagrams illustrate the differences in operation betweeen `p-map`, `p-queue`, and `p-map-iterable`.
Expand All @@ -45,28 +57,28 @@ These diagrams illustrate the differences in operation betweeen `p-map`, `p-queu

# Features

- `IterableMapper`
- [IterableMapper](https://tech.shutterstock.com/p-map-iterable/classes/IterableMapper.html)
- Interface and concept based on: [p-map](https://github.com/sindresorhus/p-map)
- Allows a sync or async iterable input
- User supplied sync or async mapper function
- Exposes an async iterable interface for consuming mapped items
- Allows a maximum queue depth of mapped items - if the consumer stops consuming, the queue will fill up, at which point the mapper will stop being invoked until an item is consumed from the queue
- This allows mapping with back pressure so that the mapper does not consume unlimited resources (e.g. memory, disk, network, event loop time) by racing ahead of the consumer
- `IterableQueueMapper`
- [IterableQueueMapper](https://tech.shutterstock.com/p-map-iterable/classes/IterableQueueMapper.html)
- Wraps `IterableMapper`
- Adds items to the queue via the `enqueue` method
- `IterableQueueMapperSimple`
- [IterableQueueMapperSimple](https://tech.shutterstock.com/p-map-iterable/classes/IterableQueueMapperSimple.html)
- Wraps `IterableQueueMapper`
- Discards results as they become available
- Exposes any accumulated errors through the `errors` property instead of throwing an `AggregateError`
- Not actually `Iterable` - May rename this before 1.0.0

## Lower Level Utilities
- `IterableQueue`
- [IterableQueue](https://tech.shutterstock.com/p-map-iterable/classes/IterableQueue.html)
- Lower level utility class
- Wraps `BlockingQueue`
- Exposes an async iterable interface for consuming items in the queue
- `BlockingQueue`
- [BlockingQueue](https://tech.shutterstock.com/p-map-iterable/classes/BlockingQueue.html)
- Lower level utility class
- `dequeue` blocks until an item is available or until all items have been removed, then returns `undefined`
- `enqueue` blocks if the queue is full
Expand Down
6 changes: 4 additions & 2 deletions examples/iterable-queue-mapper-simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ async function main() {
// Check for errors
if (prefetcher.errors.length > 0) {
console.error('Errors:');
prefetcher.errors.forEach((error) =>
console.error((error as Error).message ? (error as Error).message : error),
prefetcher.errors.forEach(({ error, item }) =>
console.error(
`${item} had error: ${(error as Error).message ? (error as Error).message : error}`,
),
);
}

Expand Down
18 changes: 16 additions & 2 deletions src/iterable-mapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type NewElementOrError<NewElement = unknown> = {
*
* @remarks
*
* Essentially - This allows performing a concurrent mapping with
* This allows performing a concurrent mapping with
* back pressure (won't iterate all source items if the consumer is
* not reading).
*
Expand Down Expand Up @@ -93,7 +93,21 @@ export class IterableMapper<Element, NewElement> implements AsyncIterable<NewEle
* Create a new `IterableMapper`
*
* @param input Iterated over concurrently in the `mapper` function.
* @param mapper Function which is called for every item in `input`. Expected to return a `Promise` or value.
* @param mapper Function which is called for every item in `input`.
* Expected to return a `Promise` or value.
*
* The `mapper` *should* handle all errors and not allow an error to be thrown
* out of the `mapper` function as this enables the best handling of errors
* closest to the time that they occur.
*
* If the `mapper` function does allow an error to be thrown then the
* `stopOnMapperError` option controls the behavior:
* - `stopOnMapperError`: `true` - will throw the error
* out of `next` or the `AsyncIterator` returned from `[Symbol.asyncIterator]`
* and stop processing.
* - `stopOnMapperError`: `false` - will continue processing
* and accumulate the errors to be thrown from `next` or the `AsyncIterator`
* returned from `[Symbol.asyncIterator]` when all items have been processed.
* @param options IterableMapper options
*/
constructor(
Expand Down
4 changes: 2 additions & 2 deletions src/iterable-queue-mapper-simple.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ describe('IterableQueueMapperSimple', () => {
expect(backgroundWriter.isIdle).toBe(true);

expect(backgroundWriter.errors.length).toBe(5);
expect(backgroundWriter.errors[0]).toBeInstanceOf(Error);
expect((backgroundWriter.errors[0] as Error).message).toBe('stop this now');
expect(backgroundWriter.errors[0].error).toBeInstanceOf(Error);
expect((backgroundWriter.errors[0].error as Error).message).toBe('stop this now');

// Show that double onIdle() does not hang or cause an error
await backgroundWriter.onIdle();
Expand Down
39 changes: 34 additions & 5 deletions src/iterable-queue-mapper-simple.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,22 @@ import { Mapper } from './iterable-mapper';
import { IterableQueueMapper } from './iterable-queue-mapper';

// eslint-disable-next-line @typescript-eslint/no-explicit-any
type Errors = (string | { [key: string]: any } | Error)[];
type Errors<T> = { item: T; error: string | { [key: string]: any } | Error }[];

const NoResult = Symbol('noresult');

/**
* Accepts queue items via `enqueue` and calls the `mapper` on them
* with specified concurrency, storing the
* `mapper` result in a queue of specified max size, before
* being iterated / read by the caller. The `enqueue` method will block if
* the queue is full, until an item is read.
*
* @remarks
*
* Note: the name is somewhat of a misnomer as this wraps `IterableQueueMapper`
* but is not itself an `Iterable`.
*
* Accepts items for mapping in the background, discards the results,
* but accumulates exceptions in the `errors` property.
*
Expand All @@ -17,15 +28,24 @@ const NoResult = Symbol('noresult');
*/
export class IterableQueueMapperSimple<Element> {
private readonly _writer: IterableQueueMapper<Element, typeof NoResult>;
private readonly _errors: Errors = [];
private readonly _errors: Errors<Element> = [];
private readonly _done: Promise<void>;
private readonly _mapper: Mapper<Element, void>;
private _isIdle = false;

/**
* Create a new `IterableQueueMapperSimple`
*
* @param mapper Function which is called for every item in `input`. Expected to return a `Promise` or value.
* @param mapper Function which is called for every item in `input`.
* Expected to return a `Promise` or value.
*
* The `mapper` *should* handle all errors and not allow an error to be thrown
* out of the `mapper` function as this enables the best handling of errors
* closest to the time that they occur.
*
* If the `mapper` function does allow an error to be thrown then the
* errors will be accumulated in the `errors` property.
* @param options IterableQueueMapperSimple options
*/
constructor(
mapper: Mapper<Element, void>,
Expand Down Expand Up @@ -62,15 +82,24 @@ export class IterableQueueMapperSimple<Element> {
await this._mapper(item, index);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (error: any) {
this._errors.push(error);
this._errors.push({ item, error });
}
return NoResult;
}

/**
* Accumulated errors from background `mappers`s
*
* @remarks
*
* Note that this property can be periodically checked
* during processing and errors can be `.pop()`'d off of the array
* and logged / handled as desired. Errors `.pop()`'d off of the array
* will no longer be available in the array on the next check.
*
* @returns Reference to the errors array
*/
public get errors(): Errors {
public get errors(): Errors<Element> {
return this._errors;
}

Expand Down
43 changes: 33 additions & 10 deletions src/iterable-queue-mapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,29 @@ export interface IterableQueueMapperOptions {
}

/**
* Iterates over a source iterable with specified concurrency,
* calling the `mapper` on each iterated item, and storing the
* Accepts queue items via `enqueue` and calls the `mapper` on them
* with specified concurrency, storing the
* `mapper` result in a queue of specified max size, before
* being iterated / read by the caller.
* being iterated / read by the caller. The `enqueue` method will block if
* the queue is full, until an item is read.
*
* @remarks
*
* Essentially - This allows performing a concurrent mapping with
* back pressure (won't iterate all source items if the consumer is
* not reading).
* This allows performing a concurrent mapping with
* back pressure for items added after queue creation
* via a method call.
*
* Typical use case is for a `prefetcher` that ensures that items
* are always ready for the consumer but that large numbers of items
* are not processed before the consumer is ready for them.
* Because items are added via a method call it is possible to
* chain an `IterableMapper` that prefetches files and processes them,
* with an `IterableQueueMapper` that processes the results of the
* `mapper` function of the `IterableMapper`.
*
* Typical use case is for a `background uploader` that prevents
* the producer from racing ahead of the upload process, consuming
* too much memory or disk space. As items are ready for upload
* they are added to the queue with the `enqueue` method, which is
* `await`ed by the caller. If the queue has room then `enqueue`
* will return immediately, otherwise it will block until there is room.
*
* @category Enqueue Input
*/
Expand All @@ -57,7 +66,21 @@ export class IterableQueueMapper<Element, NewElement> implements AsyncIterable<N
/**
* Create a new `IterableQueueMapper`
*
* @param mapper Function which is called for every item in `input`. Expected to return a `Promise` or value.
* @param mapper Function which is called for every item in `input`.
* Expected to return a `Promise` or value.
*
* The `mapper` *should* handle all errors and not allow an error to be thrown
* out of the `mapper` function as this enables the best handling of errors
* closest to the time that they occur.
*
* If the `mapper` function does allow an error to be thrown then the
* `stopOnMapperError` option controls the behavior:
* - `stopOnMapperError`: `true` - will throw the error
* out of `next` or the `AsyncIterator` returned from `[Symbol.asyncIterator]`
* and stop processing.
* - `stopOnMapperError`: `false` - will continue processing
* and accumulate the errors to be thrown from `next` or the `AsyncIterator`
* returned from `[Symbol.asyncIterator]` when all items have been processed.
* @param options IterableQueueMapper options
*/
constructor(mapper: Mapper<Element, NewElement>, options: IterableQueueMapperOptions = {}) {
Expand Down

0 comments on commit 7b3c80f

Please sign in to comment.