Skip to content

Commit

Permalink
feat(sandbox): add support for getChildrenValues
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Jan 21, 2025
1 parent 490d902 commit 52801b8
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 37 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
"pretty:quick": "pretty-quick --ignore-path ./.eslintignore --staged",
"semantic-release": "semantic-release",
"semantic-release-prepare": "ts-node tools/semantic-release-prepare",
"test": "ts-mocha -p tsconfig-cjs.json --config ./.mocharc.js",
"test": "NODE_ENV=test ts-mocha -p tsconfig-cjs.json --config ./.mocharc.js",
"test:watch": "ts-mocha -p tsconfig-cjs.json --paths 'tests/test_*.ts' -w --watch-extensions ts",
"transform:commands": "node ./scripts/commandTransform.js ./rawScripts ./src/scripts",
"tsc": "tsc",
Expand Down
58 changes: 52 additions & 6 deletions src/classes/child-processor.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ParentCommand } from '../enums';
import { SandboxedJob } from '../interfaces';
import { SandboxedJob, Receiver } from '../interfaces';
import { JobJsonSandbox } from '../types';
import { errorToJSON } from '../utils';

Expand All @@ -10,6 +10,8 @@ enum ChildStatus {
Errored,
}

const RESPONSE_TIMEOUT = process.env.NODE_ENV === 'test' ? 100 : 5_000;

/**
* ChildProcessor
*
Expand All @@ -22,7 +24,10 @@ export class ChildProcessor {
public processor: any;
public currentJobPromise: Promise<unknown> | undefined;

constructor(private send: (msg: any) => Promise<void>) {}
constructor(
private send: (msg: any) => Promise<void>,
private receiver: Receiver,
) {}

public async init(processorFile: string): Promise<void> {
let processor;
Expand Down Expand Up @@ -120,7 +125,7 @@ export class ChildProcessor {
opts: job.opts,
returnValue: JSON.parse(job.returnvalue || '{}'),
/*
* Emulate the real job `updateProgress` function, should works as `progress` function.
* Proxy `updateProgress` function, should works as `progress` function.
*/
async updateProgress(progress: number | object) {
// Locally store reference to new progress value
Expand All @@ -133,7 +138,7 @@ export class ChildProcessor {
});
},
/*
* Emulate the real job `log` function.
* Proxy job `log` function.
*/
log: async (row: any) => {
await send({
Expand All @@ -142,7 +147,7 @@ export class ChildProcessor {
});
},
/*
* Emulate the real job `moveToDelayed` function.
* Proxy `moveToDelayed` function.
*/
moveToDelayed: async (timestamp: number, token?: string) => {
await send({
Expand All @@ -151,7 +156,7 @@ export class ChildProcessor {
});
},
/*
* Emulate the real job `updateData` function.
* Proxy `updateData` function.
*/
updateData: async (data: any) => {
await send({
Expand All @@ -160,8 +165,49 @@ export class ChildProcessor {
});
wrappedJob.data = data;
},

/**
* Proxy `getChildrenValues` function.
*/
getChildrenValues: async () => {
const requestId = Math.random().toString(36).substring(2, 15);
await send({
requestId,
cmd: ParentCommand.GetChildrenValues,
});

return waitResponse(
requestId,
this.receiver,
RESPONSE_TIMEOUT,
'getChildrenValues',
);
},
};

return wrappedJob;
}
}

const waitResponse = async (
requestId: string,
receiver: Receiver,
timeout: number,
cmd: string,
) => {
return new Promise((resolve, reject) => {
const listener = (msg: { requestId: string; value: any }) => {
if (msg.requestId === requestId) {
resolve(msg.value);
receiver.off('message', listener);
}
};
receiver.on('message', listener);

setTimeout(() => {
receiver.off('message', listener);

reject(new Error(`TimeoutError: ${cmd} timed out in (${timeout}ms)`));
}, timeout);
});
};
8 changes: 3 additions & 5 deletions src/classes/main-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@
import { ChildProcessor } from './child-processor';
import { ParentCommand, ChildCommand } from '../enums';
import { errorToJSON, toString } from '../utils';
import { Receiver } from '../interfaces';

export default (
send: (msg: any) => Promise<void>,
receiver: { on: (evt: 'message', cb: (msg: any) => void) => void },
) => {
const childProcessor = new ChildProcessor(send);
export default (send: (msg: any) => Promise<void>, receiver: Receiver) => {
const childProcessor = new ChildProcessor(send, receiver);

receiver?.on('message', async msg => {
try {
Expand Down
64 changes: 39 additions & 25 deletions src/classes/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,46 @@ const sandbox = <T, R, N extends string>(
child.on('exit', exitHandler);

msgHandler = async (msg: ChildMessage) => {
switch (msg.cmd) {
case ParentCommand.Completed:
resolve(msg.value);
break;
case ParentCommand.Failed:
case ParentCommand.Error: {
const err = new Error();
Object.assign(err, msg.value);
reject(err);
break;
try {
switch (msg.cmd) {
case ParentCommand.Completed:
resolve(msg.value);
break;
case ParentCommand.Failed:
case ParentCommand.Error: {
const err = new Error();
Object.assign(err, msg.value);
reject(err);
break;
}
case ParentCommand.Progress:
await job.updateProgress(msg.value);
break;
case ParentCommand.Log:
await job.log(msg.value);
break;
case ParentCommand.MoveToDelayed:
await job.moveToDelayed(
msg.value?.timestamp,
msg.value?.token,
);
break;
case ParentCommand.Update:
await job.updateData(msg.value);
break;
case ParentCommand.GetChildrenValues:
{
const value = await job.getChildrenValues();
child.send({
requestId: msg.requestId,
cmd: ChildCommand.GetChildrenValuesResponse,
value,
});
}
break;
}
case ParentCommand.Progress:
await job.updateProgress(msg.value);
break;
case ParentCommand.Log:
await job.log(msg.value);
break;
case ParentCommand.MoveToDelayed:
await job.moveToDelayed(
msg.value?.timestamp,
msg.value?.token,
);
break;
case ParentCommand.Update:
await job.updateData(msg.value);
break;
} catch (err) {
reject(err);
}
};

Expand Down
1 change: 1 addition & 0 deletions src/enums/child-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ export enum ChildCommand {
Init,
Start,
Stop,
GetChildrenValuesResponse,
}
1 change: 1 addition & 0 deletions src/enums/parent-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ export enum ParentCommand {
MoveToDelayed,
Progress,
Update,
GetChildrenValues,
}
1 change: 1 addition & 0 deletions src/interfaces/child-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { ParentCommand } from '../enums/parent-command';

export interface ChildMessage {
cmd: ParentCommand;
requestId?: string;
value?: any;
err?: Record<string, any>;
}
1 change: 1 addition & 0 deletions src/interfaces/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ export * from './sandboxed-job';
export * from './sandboxed-options';
export * from './worker-options';
export * from './telemetry';
export * from './receiver';
4 changes: 4 additions & 0 deletions src/interfaces/receiver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export interface Receiver {
on: (evt: 'message', cb: (msg: any) => void) => void;
off: (evt: 'message', cb: (msg: any) => void) => void;
}
10 changes: 10 additions & 0 deletions tests/fixtures/fixture_processor_get_children_values.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* A processor file to be used in tests.
*
*/
'use strict';

module.exports = async function (job) {
const values = await job.getChildrenValues();
return values;
};
9 changes: 9 additions & 0 deletions tests/fixtures/fixture_processor_get_children_values_child.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* A processor file to be used in tests.
*
*/
'use strict';

module.exports = function (job) {
return { childResult: 'bar' };
};
113 changes: 113 additions & 0 deletions tests/test_sandboxed_process.ts
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,119 @@ function sandboxProcessTests(
await worker.close();
});

it('can get children values by calling getChildrenValues', async () => {
const childJobId = 'child-job-id';
const childProcessFile =
__dirname + '/fixtures/fixture_processor_get_children_values_child.js';
const parentProcessFile =
__dirname + '/fixtures/fixture_processor_get_children_values.js';
const parentQueueName = `parent-queue-${v4()}`;

const parentWorker = new Worker(parentQueueName, parentProcessFile, {
connection,
prefix,
drainDelay: 1,
useWorkerThreads,
});

const childWorker = new Worker(queueName, childProcessFile, {
connection,
prefix,
drainDelay: 1,
useWorkerThreads,
});

const parentCompleting = new Promise<void>((resolve, reject) => {
parentWorker.on('completed', async (job: Job, value: any) => {
try {
expect(value).to.be.eql({
[`bull:${queueName}:${childJobId}`]: { childResult: 'bar' },
});
await parentWorker.close();
resolve();
} catch (err) {
await parentWorker.close();
reject(err);
}
});
});

const flow = new FlowProducer({ connection, prefix });
await flow.add({
name: 'parent-job',
queueName: parentQueueName,
opts: { jobId: 'job-id' },
children: [
{ name: 'child-job', queueName, opts: { jobId: childJobId } },
],
});

await parentCompleting;
await parentWorker.close();
await childWorker.close();
await flow.close();
});

it('will fail job if calling getChildrenValues is too slow', async () => {
// Mockup Job.getChildrenValues to be slow
const getChildrenValues = Job.prototype.getChildrenValues;
Job.prototype.getChildrenValues = async function () {
await delay(50000);
return getChildrenValues.call(this);
};

const childJobId = 'child-job-id';
const childProcessFile =
__dirname + '/fixtures/fixture_processor_get_children_values_child.js';
const parentProcessFile =
__dirname + '/fixtures/fixture_processor_get_children_values.js';
const parentQueueName = `parent-queue-${v4()}`;

const parentWorker = new Worker(parentQueueName, parentProcessFile, {
connection,
prefix,
drainDelay: 1,
useWorkerThreads,
});

const childWorker = new Worker(queueName, childProcessFile, {
connection,
prefix,
drainDelay: 1,
useWorkerThreads,
});

const parentFailing = new Promise<void>((resolve, reject) => {
parentWorker.on('failed', async (_, error: Error) => {
try {
expect(error.message).to.be.eql(
'TimeoutError: getChildrenValues timed out in (100ms)',
);
await parentWorker.close();
resolve();
} catch (err) {
await parentWorker.close();
reject(err);
}
});
});

const flow = new FlowProducer({ connection, prefix });
await flow.add({
name: 'parent-job',
queueName: parentQueueName,
opts: { jobId: 'job-id' },
children: [
{ name: 'child-job', queueName, opts: { jobId: childJobId } },
],
});

await parentFailing;
await parentWorker.close();
await childWorker.close();
await flow.close();
});

it('should process and move to delayed', async () => {
const processFile =
__dirname + '/fixtures/fixture_processor_move_to_delayed.js';
Expand Down

0 comments on commit 52801b8

Please sign in to comment.