Skip to content

Commit

Permalink
chore: improved optional transform callback management
Browse files Browse the repository at this point in the history
  • Loading branch information
alessiofrittoli committed Jan 18, 2025
1 parent e48a85f commit bce18a9
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 47 deletions.
59 changes: 31 additions & 28 deletions __tests__/reader.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ describe( 'StreamReader', () => {
jest.restoreAllMocks()
} )

it( 'emit \'read\' Event when chunk is received', async () => {

it( 'emit \'data\' Event when chunk is received', async () => {
const stream = new TransformStream<Buffer, Buffer>()
const writer = stream.writable.getWriter()
const reader = new StreamReader( stream.readable )
Expand All @@ -51,6 +52,33 @@ describe( 'StreamReader', () => {
} )


it( 'allows chunk by chunk transformation', async () => {

const stream = new TransformStream<Buffer, Buffer>()
const writer = stream.writable.getWriter()
const reader = new StreamReader<Buffer, string>( stream.readable, {
transform: String,
} )

streamData( { writer } )

reader.on( 'data', chunk => {
expect( typeof chunk ).toBe( 'string' )
} )

const dataRead = reader.read()

await expect( dataRead ).resolves.toBeInstanceOf( Array )

const chunks = ( await dataRead ).map( chunk => {
expect( typeof chunk ).toBe( 'string' )
return chunk
} )
expect( chunks ).toEqual( defaultChunks )

} )


it( 'emit \'close\' Event when stream writer get closed', async () => {
const stream = new TransformStream<Buffer, Buffer>()
const writer = stream.writable.getWriter()
Expand Down Expand Up @@ -88,7 +116,7 @@ describe( 'StreamReader', () => {
} )


it( 'removes \'read\' and \'close\' listeners on close', async () => {
it( 'removes \'data\' and \'close\' listeners on close', async () => {
const stream = new TransformStream<Buffer, Buffer>()
const writer = stream.writable.getWriter()
const reader = new StreamReader( stream.readable )
Expand Down Expand Up @@ -138,7 +166,7 @@ describe( 'StreamReader', () => {
} )


it( 'removes \'read\' and \'close\' listeners when Error occures', async () => {
it( 'removes \'data\' and \'close\' listeners when Error occures', async () => {
const stream = new TransformStream<Buffer, Buffer>()
const writer = stream.writable.getWriter()
const reader = new StreamReader( stream.readable )
Expand Down Expand Up @@ -273,31 +301,6 @@ describe( 'StreamReader', () => {
await expect( streamPromise ).rejects.toThrow( 'Streming reader cancelled.' )
} )


it( 'allows chunk by chunk transformation', async () => {

const stream = new TransformStream<Buffer, Buffer>()
const writer = stream.writable.getWriter()
const reader = new StreamReader<Buffer, string>( stream.readable )

streamData( { writer } )

reader.on( 'data', chunk => {
expect( typeof chunk ).toBe( 'string' )
} )

const dataRead = reader.read( String )

await expect( dataRead ).resolves.toBeInstanceOf( Array )

const chunks = ( await dataRead ).map( chunk => {
expect( typeof chunk ).toBe( 'string' )
return chunk
} )
expect( chunks ).toEqual( defaultChunks )

} )

} )


Expand Down
27 changes: 15 additions & 12 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { EventEmitter } from '@alessiofrittoli/event-emitter'

import { generatorToReadableStream } from './utils'
import type { ReadChunk, ReadChunks, StreamReaderEvents, TransformChunk } from './types'
import type { ReadChunk, ReadChunks, StreamReaderEvents, Options } from './types'


/**
Expand Down Expand Up @@ -35,39 +35,42 @@ export class StreamReader<I = unknown, O = I> extends EventEmitter<StreamReaderE
* @private
*/
private receivedChunks: ReadChunks<O>
private transform: Options<I, O>[ 'transform' ]


/**
* Creates an instance of `StreamReader`.
* @param stream The input `ReadableStream<T>` to read data from.
*/
constructor( stream: ReadableStream<I> )
constructor( stream: ReadableStream<I>, options?: Options<I, O> )
{
super( { captureRejections: true } )

this.reader = stream.getReader()
this.closed = false
this.receivedChunks = []
this.transform = options?.transform
this.reader = stream.getReader()
this.closed = false
this.receivedChunks = []
}


/**
* Asynchronously reads on-demand stream data.
*
* Optionally transform each chunk using the provided transform function.
* Emits a 'data' event for each chunk after it has been processed.
* If an error occurs during the reading process, it is caught and passed to the `error` method.
*
* @template I - The type of the input chunks.
* @template O - The type of the output chunks after transformation.
* @param transform - (Optional) A function that transforms each chunk.
* @returns A new Promise that resolves to an array of processed chunks.
* @returns A new Promise that resolves to an array of processed chunks if the given `options.inMemory` is `true`.
*/
async read( transform?: TransformChunk<I, O> )
async read()
{
try {
for await ( const chunk of this.readChunks() ) {
const processed = ( typeof transform === 'function' ? await transform( chunk ) : chunk ) as ReadChunk<O>
const processed = (
this.transform
? await this.transform( chunk )
: chunk as ReadChunk<O>
)

this.receivedChunks.push( processed )
this.emit( 'data', processed )
}
Expand Down
27 changes: 20 additions & 7 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,22 @@ export type ReadChunk<T = unknown> = T | Awaited<T>
/**
* Represents an Array of `ReadChunk` objects.
*
* @template T - The type of data contained in each `ReadChunk`.
* @template O - The type of data contained in each `ReadChunk`.
*/
export type ReadChunks<T = unknown> = ReadChunk<T>[]
export type ReadChunks<O = unknown> = ReadChunk<O>[]


/**
* Custom additional options.
*
* @template I The type of the input chunk. Defaults to `unknown`.
* @template O The type of the output chunk. Defaults to `I`.
*/
export interface Options<I = unknown, O = I>
{
/** A function that transforms a chunk of data. */
transform?: TransformChunk<I, O>
}


/**
Expand All @@ -26,7 +39,7 @@ export type ReadChunks<T = unknown> = ReadChunk<T>[]
* @param chunk - The chunk of data to be transformed.
* @returns The transformed chunk of data, which can be either a synchronous result or a Promise that resolves to the result.
*/
export type TransformChunk<I = unknown, O = unknown> = ( chunk: ReadChunk<I> ) => ( ReadChunk<O> | Promise<ReadChunk<O>> )
export type TransformChunk<I = unknown, O = I> = ( chunk: ReadChunk<I> ) => ( ReadChunk<O> | PromiseLike<ReadChunk<O>> )


/**
Expand All @@ -36,28 +49,28 @@ export type TransformChunk<I = unknown, O = unknown> = ( chunk: ReadChunk<I> ) =
export type StreamReaderEvents<O = unknown> = {
/**
* Emitted when a chunk of data is read from the stream.
* @param {ReadChunk<O>} chunk - The chunk of data read from the stream.
* @param chunk The chunk of data read from the stream.
*/
data: [ chunk: ReadChunk<O> ]


/**
* Emitted when the stream is closed.
* @param {ReadChunks<O>} chunks - An array of chunks read from the stream before it was closed.
* @param chunks An array of chunks read from the stream before it was closed.
*/
close: [ chunks: ReadChunks<O> ]


/**
* Emitted when an error occurs during reading.
* @param {Error} error - The error that occurred during the reading process.
* @param error The error that occurred during the reading process.
*/
error: [ error: Error ]


/**
* Emitted when the reading process is canceled.
* @param {DOMException} reason - A DOMException explaing the reason for aborting the operation.
* @param reason A DOMException explaing the reason for aborting the operation.
*/
cancel: [ reason: DOMException ]
}
Expand Down

0 comments on commit bce18a9

Please sign in to comment.