Skip to content

Commit 67f3129

Browse files
committed
fix(child_process): pipe stdout and stderr to main thread
1 parent b7418b5 commit 67f3129

File tree

3 files changed

+84
-0
lines changed

3 files changed

+84
-0
lines changed

src/runtime/process-worker.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,19 @@ export default class ProcessWorker implements TinypoolWorker {
2626
options.argv,
2727
{
2828
...options,
29+
stdio: 'pipe',
2930
env: {
3031
...options.env,
3132
TINYPOOL_WORKER_ID: options.workerData[0].workerId.toString(),
3233
},
3334
}
3435
)
36+
37+
process.stdout.setMaxListeners(1 + process.stdout.getMaxListeners())
38+
process.stderr.setMaxListeners(1 + process.stderr.getMaxListeners())
39+
this.process.stdout?.pipe(process.stdout)
40+
this.process.stderr?.pipe(process.stderr)
41+
3542
this.threadId = this.process.pid!
3643

3744
this.process.on('exit', this.onUnexpectedExit)
@@ -54,6 +61,8 @@ export default class ProcessWorker implements TinypoolWorker {
5461
this.process.kill()
5562
await this.waitForExit
5663

64+
this.process.stdout?.unpipe(process.stdout)
65+
this.process.stderr?.unpipe(process.stderr)
5766
this.port?.close()
5867
clearTimeout(sigkillTimeout)
5968
}
@@ -136,6 +145,21 @@ export default class ProcessWorker implements TinypoolWorker {
136145
// This requires manual unreffing of its channel.
137146
this.process.channel?.unref()
138147

148+
if (hasUnref(this.process.stdout)) {
149+
this.process.stdout.unref()
150+
}
151+
152+
if (hasUnref(this.process.stderr)) {
153+
this.process.stderr.unref()
154+
}
155+
139156
return this.process.unref()
140157
}
141158
}
159+
160+
// unref is untyped for some reason
161+
function hasUnref(stream: null | object): stream is { unref: () => void } {
162+
return (
163+
stream != null && 'unref' in stream && typeof stream.unref === 'function'
164+
)
165+
}

test/fixtures/stdio.mjs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export default function run() {
2+
process.stdout.write('Worker message')
3+
process.stderr.write('Worker error')
4+
}

test/worker-stdio.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import * as path from 'node:path'
2+
import { fileURLToPath } from 'node:url'
3+
import { stripVTControlCharacters } from 'node:util'
4+
import { Tinypool } from 'tinypool'
5+
6+
const runtimes = ['worker_threads', 'child_process'] as const
7+
const __dirname = path.dirname(fileURLToPath(import.meta.url))
8+
9+
test.each(runtimes)(
10+
"worker's stdout and stderr are piped to main thread when { runtime: '%s' }",
11+
async (runtime) => {
12+
const pool = createPool({
13+
runtime,
14+
minThreads: 1,
15+
maxThreads: 1,
16+
})
17+
18+
const getStdout = captureStandardStream('stdout')
19+
const getStderr = captureStandardStream('stderr')
20+
21+
await pool.run({})
22+
23+
const stdout = getStdout()
24+
const stderr = getStderr()
25+
26+
expect(stdout).toMatch('Worker message')
27+
28+
expect(stderr).toMatch('Worker error')
29+
}
30+
)
31+
32+
function createPool(options: Partial<Tinypool['options']>) {
33+
const pool = new Tinypool({
34+
filename: path.resolve(__dirname, 'fixtures/stdio.mjs'),
35+
minThreads: 1,
36+
maxThreads: 1,
37+
...options,
38+
})
39+
40+
return pool
41+
}
42+
43+
function captureStandardStream(type: 'stdout' | 'stderr') {
44+
const spy = vi.fn()
45+
46+
// eslint-disable-next-line @typescript-eslint/unbound-method
47+
const original = process[type].write
48+
process[type].write = spy
49+
50+
return function collect() {
51+
process[type].write = original
52+
return stripVTControlCharacters(
53+
spy.mock.calls.map((call) => call[0]).join('')
54+
)
55+
}
56+
}

0 commit comments

Comments
 (0)