Skip to content

Commit

Permalink
fix(interceptTransfer): rename reworkers to operators
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanGerbeth committed Jan 16, 2025
1 parent 5ca78e4 commit 36bd4ea
Showing 1 changed file with 13 additions and 13 deletions.
26 changes: 13 additions & 13 deletions packages/operators/src/transfer/interceptTransfer.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { concatMap, from, map, of } from 'rxjs';

import { readBytes } from './utils';

export const interceptTransfer = (reworkers = [], chunkSize = 60 * 1024) => {
export const interceptTransfer = (operators = [], chunkSize = 60 * 1024) => {
return source =>
source.pipe(
concatMap(requestResponse => {
if (reworkers.length) {
if (operators.length) {
return of(requestResponse).pipe(
sourceToStream(),
interceptStream(reworkers, chunkSize),
interceptStream(operators, chunkSize),
streamToSource(requestResponse)
);
}
Expand All @@ -28,7 +28,7 @@ const streamToSource = reqResp => {
source.pipe(concatMap(stream => streamToObjectMap.get(reqResp.constructor)(stream, reqResp)));
};

const interceptStream = (reworkers, chunkSize) => {
const interceptStream = (operators, chunkSize) => {
return source =>
source.pipe(
map(({ stream, total }) => {
Expand All @@ -44,9 +44,9 @@ const interceptStream = (reworkers, chunkSize) => {
const { done, value } = await this.chunks.next();
try {
if (done) {
return onStreamEnd(controller, reworkers);
return onStreamEnd(controller, operators);
}
await onStreamPull(controller, reworkers, value, total, this.time);
await onStreamPull(controller, operators, value, total, this.time);
} catch (err) {
onStreamError(err);
throw err;
Expand Down Expand Up @@ -103,17 +103,17 @@ const convertStreamToResponse = (stream, resp) => {
);
};

const onStreamPull = async (controller, reworkers, value, total, period) => {
controller.enqueue(new Uint8Array(value));
reworkers.map(reworker => reworker.next({ value, total, period }));
const onStreamPull = async (controller, operators, bytes, total, period) => {
controller.enqueue(new Uint8Array(bytes));
operators.map(operator => operator.next({ value: bytes, total, period }));
};

const onStreamEnd = (controller, reworkers) => {
const onStreamEnd = (controller, operators) => {
controller.close();
reworkers.map(reworker => reworker.complete());
operators.map(operator => operator.complete());
return;
};

const onStreamError = (reworkers, err) => {
reworkers.map(reworker => reworker.error(err));
const onStreamError = (operators, err) => {
operators.map(operator => operator.error(err));
};

0 comments on commit 36bd4ea

Please sign in to comment.