The StreamReader
class is a utility for reading data from a ReadableStream
in a structured and event-driven manner. It extends the EventEmitter
class, providing events for stream lifecycle management and error handling.
Run the following command to start using stream-reader
in your projects:
npm i @alessiofrittoli/stream-reader
or using pnpm
pnpm i @alessiofrittoli/stream-reader
import { StreamReader } from '@alessiofrittoli/stream-reader'
import type { ... } from '@alessiofrittoli/stream-reader/types'
The StreamReader
class constructor accepts a ReadableStream
argument. You can optionally pass types arguments I
and O
to define the type of the streamed data being read and the type of the transformed output chunk.
Example
const reader = new StreamReader<Buffer>( ... )
const reader2 = new StreamReader<Buffer, string>( ... )
Automatically inferred type
type Input = Buffer
type Output = Buffer
const stream = new TransformStream<Input, Output>()
const reader = new StreamReader( stream.readable ) // type of `StreamReader<Output, Output>`
Here are listed the StreamReader
class instance accessible properties:
Properties
Parameter | Type | Description |
---|---|---|
reader |
ReadableStreamDefaultReader<T> |
The reader obtained from the input ReadableStream<T> |
closed |
boolean |
Indicates whether the stream has been closed. |
Type parameters
Parameter | Default | Description |
---|---|---|
I |
unknown |
The type of input data being read from the stream. |
O |
I |
The type of output data transformed after reading from the stream. Defaults to the same type of I . |
The StreamReader.read()
method read the on-demand pushed data from the given stream.
It internally uses the StreamReader.readChunks()
method to read the received chunks.
It emits usefull events such as:
data
- Emitted when a chunk is received from the stream and processed by the optional transform function.close
- Emitted when the stream is closed.error
- Emitted when an error occurs while reading.
- See Listening Events section for further info.
Parameters
| Paramenter | Type | Description |
| transform
| TransformChunk<I, O>
| (Optional) A function that transforms each chunk. |
Returns
Type: Promise<ReadChunks<O>>
A new Promise with an Array of read and eventually transformed chunks, resolved once the stream is closed.
The StreamReader.readChunks()
method read the on-demand pushed data from the given stream.
Returns
Type: AsyncGenerator<ReadChunk<I>>
An async iterable object for consuming chunks of data.
The StreamReader.cancel()
method it's pretty usefull when stream data reading is no longer needed, regardless of stream writer state.
This method will cancel the reader, release the lock, emit a 'cancel' event, and remove data
, close
and cancel
event listeners.
It emits the cancel
event.
- See Listening Events section for further info.
The StreamReader.generatorToReadableStream()
method is a utiliy function that converts a Generator
or AsyncGenerator
to a ReadableStream
.
Parameters
Parameter | Type | Default | Description |
---|---|---|---|
generator |
StreamGenerator<T> |
StreamGenerator<unknown> |
The Generator or AsyncGenerator to convert. |
Type Parameters
Parameter | Type | Default | Description |
---|---|---|---|
T |
T |
unknown |
The type of data produced by the iterator. |
The StreamReader
class extends the EventEmitter
class, providing events for stream lifecycle management and error handling.
Events list
Event | Arguments | Type | Description |
---|---|---|---|
data |
Emitted when a chunk of data is read from the stream and processed by the optional transform function. |
||
chunk |
ReadChunk<O> |
The chunk of data read from the stream. | |
close |
Emitted when the stream is closed. | ||
chunks |
ReadChunks<O> |
An array of chunks read from the stream before it was closed. | |
error |
Emitted when an error occurs during reading. | ||
error |
Error |
The error that occurred during the reading process. | |
cancel |
Emitted when the reading process is canceled. | ||
reason |
DOMException |
A DOMException explaing the reason for aborting the operation. |
Examples
const reader = new StreamReader( ... )
reader.on( 'data', chunk => {
console.log( 'received chunk', chunk )
} )
const reader = new StreamReader( ... )
reader.on( 'close', chunks => {
console.log( 'chunks', chunks )
} )
const reader = new StreamReader( ... )
reader.on( 'error', error => {
console.error( error )
} )
const reader = new StreamReader( ... )
reader.on( 'cancel', reason => {
console.log( 'reading cancelled', reason.message )
} )
In the following examples we reference streamData
which is an async function that writes data and closes the stream once finished:
const sleep = ( ms: number ) => new Promise<void>( resolve => setTimeout( resolve, ms ) )
const defaultChunks = [ 'data 1', 'data 2' ]
const erroredChunks = [ 'data 1', new Error( 'Test Error' ), 'data 2' ]
const streamData = async (
{ writer, chunks }: {
writer: WritableStreamDefaultWriter<Buffer>
chunks?: ( string | Error )[]
}
) => {
chunks ||= defaultChunks
for await ( const chunk of chunks ) {
if ( chunk instanceof Error ) {
throw chunk
}
await writer.write( Buffer.from( chunk ) )
await sleep( 50 )
}
await writer.close()
writer.releaseLock()
}
Basic usage
const stream = new TransformStream<Buffer, Buffer>()
const writer = stream.writable.getWriter()
const reader = new StreamReader( stream.readable )
streamData( { writer } )
const chunks = await reader.read()
Reading chunk by chunk from a Response Body
const response = await fetch( ... )
let resourceSize = 0
if ( response.body ) {
const reader = new StreamReader( response.body )
const decoder = new TextDecoder()
reader.on( 'data', chunk => {
const decoded = decoder.decode( chunk, { stream: true } )
resourceSize += chunk.BYTES_PER_ELEMENT * chunk.length
} )
const chunks = await reader.read()
}
Transforming read chunks
const stream = new TransformStream<Buffer, Buffer>()
const writer = stream.writable.getWriter()
const reader = new StreamReader<Buffer, string>( stream.readable, {
transform( chunk ) {
return chunk.toString( 'base64url' )
}
} )
streamData( { writer } )
reader.on( 'data', chunk => {
console.log( chunk ) // chunk is now a base64url string
} )
const chunks = await reader.read() // `string[]`
Opting-out from in-memory chunk collection
const inMemory = false
const stream = new TransformStream<Buffer, Buffer>()
const writer = stream.writable.getWriter()
const reader = new StreamReader( stream.readable, { inMemory } )
streamData( { writer } )
reader.on( 'data', chunk => {
console.log( chunk )
} )
const chunks = await reader.read() // empty `[]`
Cancelling the reader before Stream is closed
const stream = new TransformStream<Buffer, Buffer>()
const writer = stream.writable.getWriter()
const reader = new StreamReader( stream.readable )
streamData( { writer } )
reader.read()
cancelButton.addEventListener( 'click', () => {
reader.cancel( 'Reading no longer needed' )
} )
When an error occurs while reading stream data (such as unexpected stream abort), the StreamReader
uses an internal error
function which handles the thrown Error.
By default, if no listener is attached to the error
event, the StreamReader.read()
method will re-throw the caught error.
In that case, you need to await and wrap the StreamReader.read()
method call in a trycatch
block like so:
try {
const chunks = await reader.read()
} catch ( err ) {
const error = err as Error
console.error( 'An error occured', error.message )
}
with error
event listener:
reader.read()
reader.on( 'error', error => {
console.error( 'An error occured', error.message )
} )
Represents a chunk of data that can be read, which can either be of type T
or a promise that resolves to T
.
- Type Parameter:
T
: The type of the data chunk. Defaults tounknown
if not specified.
Represents an array of ReadChunk
objects.
- Type Parameter:
T
: The type of data contained in eachReadChunk
.
A function that transforms a chunk of data.
-
Type Parameters:
I
: The type of the input chunk. Defaults tounknown
.O
: The type of the output chunk. Defaults toI
.
-
Parameters:
chunk
: The chunk of data to be transformed.
-
Returns:
A transformed chunk of data, which can be either a synchronous result or a promise that resolves to the result.
Defines event types emitted by the StreamReader
.
-
Type Parameter:
O
: The type of data being read from the stream and eventually transformed before the event is emitted.
-
Event Types:
data
: Emitted when a chunk of data is read.- Parameters:
chunk
(ReadChunk<O>
)
- Parameters:
close
: Emitted when the stream is closed.- Parameters:
chunks
(ReadChunks<O>
)
- Parameters:
error
: Emitted when an error occurs during reading.- Parameters:
error
(Error
)
- Parameters:
cancel
: Emitted when the reading process is canceled.- Parameters:
reason
(DOMException
)
- Parameters:
A listener function for events emitted by the StreamReader
.
-
Type Parameters:
K
: The event type to listen for.O
: The type of data being read from the stream.
-
Parameters:
...args
: The arguments emitted with the event, based on the event typeK
.
Listener for the data
event.
-
Type Parameter:
O
: The type of data being read.
-
Parameters:
chunk
(ReadChunk<O>
): The chunk of data that was read.
Listener for the close
event.
-
Type Parameter:
O
: The type of data being read.
-
Parameters:
chunks
(ReadChunks<O>
): An array of chunks read before the stream was closed.
Listener for the cancel
event.
- Parameters:
reason
(DOMException
): Explains the reason for aborting the operation.
Listener for the error
event.
- Parameters:
error
(Error
): The error that occurred during reading.
A generator that produces chunks of data asynchronously. It can be either a regular generator or an async generator.
- Type Parameter:
T
: The type of data produced by the generator.
npm install
or using pnpm
pnpm i
Run the following command to build code for distribution.
pnpm build
warnings / errors check.
pnpm lint
Run all the defined test suites by running the following:
# Run tests and watch file changes.
pnpm test
# Run tests and watch file changes with jest-environment-jsdom.
pnpm test:jsdom
# Run tests in a CI environment.
pnpm test:ci
# Run tests in a CI environment with jest-environment-jsdom.
pnpm test:ci:jsdom
You can eventually run specific suits like so:
pnpm test:jest
pnpm test:jest:jsdom
Run tests with coverage.
An HTTP server is then started to serve coverage files from ./coverage
folder.
pnpm test:coverage
Contributions are truly welcome!
Please refer to the Contributing Doc for more information on how to start contributing to this project.
If you believe you have found a security vulnerability, we encourage you to responsibly disclose this and NOT open a public issue. We will investigate all legitimate reports. Email security@alessiofrittoli.it
to disclose any security vulnerabilities.
|