Skip to content

Commit e19caf8

Browse files
committed
fix(child_process): pipe stdout and stderr to main thread
1 parent b7418b5 commit e19caf8

File tree

3 files changed

+81
-0
lines changed

3 files changed

+81
-0
lines changed

src/runtime/process-worker.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,16 @@ export default class ProcessWorker implements TinypoolWorker {
2626
options.argv,
2727
{
2828
...options,
29+
stdio: 'pipe',
2930
env: {
3031
...options.env,
3132
TINYPOOL_WORKER_ID: options.workerData[0].workerId.toString(),
3233
},
3334
}
3435
)
36+
this.process.stdout?.pipe(process.stdout)
37+
this.process.stderr?.pipe(process.stderr)
38+
3539
this.threadId = this.process.pid!
3640

3741
this.process.on('exit', this.onUnexpectedExit)
@@ -54,6 +58,8 @@ export default class ProcessWorker implements TinypoolWorker {
5458
this.process.kill()
5559
await this.waitForExit
5660

61+
this.process.stdout?.unpipe(process.stdout)
62+
this.process.stderr?.unpipe(process.stderr)
5763
this.port?.close()
5864
clearTimeout(sigkillTimeout)
5965
}
@@ -136,6 +142,21 @@ export default class ProcessWorker implements TinypoolWorker {
136142
// This requires manual unreffing of its channel.
137143
this.process.channel?.unref()
138144

145+
if (hasUnref(this.process.stdout)) {
146+
this.process.stdout.unref()
147+
}
148+
149+
if (hasUnref(this.process.stderr)) {
150+
this.process.stderr.unref()
151+
}
152+
139153
return this.process.unref()
140154
}
141155
}
156+
157+
// unref is untyped for some reason
158+
function hasUnref(stream: null | object): stream is { unref: () => void } {
159+
return (
160+
stream != null && 'unref' in stream && typeof stream.unref === 'function'
161+
)
162+
}

test/fixtures/stdio.mjs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export default function run() {
2+
process.stdout.write('Worker message')
3+
process.stderr.write('Worker error')
4+
}

test/worker-stdio.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import * as path from 'node:path'
2+
import { fileURLToPath } from 'node:url'
3+
import { stripVTControlCharacters } from 'node:util'
4+
import { Tinypool } from 'tinypool'
5+
6+
const runtimes = ['worker_threads', 'child_process'] as const
7+
const __dirname = path.dirname(fileURLToPath(import.meta.url))
8+
9+
test.each(runtimes)(
10+
"worker's stdout and stderr are piped to main thread when { runtime: '%s' }",
11+
async (runtime) => {
12+
const pool = createPool({
13+
runtime,
14+
minThreads: 1,
15+
maxThreads: 1,
16+
})
17+
18+
const getStdout = captureStandardStream('stdout')
19+
const getStderr = captureStandardStream('stderr')
20+
21+
await pool.run({})
22+
23+
const stdout = getStdout()
24+
const stderr = getStderr()
25+
26+
expect(stdout).toMatch('Worker message')
27+
28+
expect(stderr).toMatch('Worker error')
29+
}
30+
)
31+
32+
function createPool(options: Partial<Tinypool['options']>) {
33+
const pool = new Tinypool({
34+
filename: path.resolve(__dirname, 'fixtures/stdio.mjs'),
35+
minThreads: 1,
36+
maxThreads: 1,
37+
...options,
38+
})
39+
40+
return pool
41+
}
42+
43+
function captureStandardStream(type: 'stdout' | 'stderr') {
44+
const spy = vi.fn()
45+
46+
// eslint-disable-next-line @typescript-eslint/unbound-method
47+
const original = process[type].write
48+
process[type].write = spy
49+
50+
return function collect() {
51+
process[type].write = original
52+
return stripVTControlCharacters(
53+
spy.mock.calls.map((call) => call[0]).join('')
54+
)
55+
}
56+
}

0 commit comments

Comments
 (0)