Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Buffering for non-tty output (non-interactive) in the vscode-integrated terminal #1957

Merged
merged 4 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/extension/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ export class RunmeExtension {
kernel
.runProgram('echo $SHELL')
.then((output) => {
if (output === false) {
if (output === undefined) {
return
}

Expand Down
19 changes: 9 additions & 10 deletions src/extension/kernel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1307,13 +1307,13 @@ export class Kernel implements Disposable {
return await this.reporter?.transform(input)
}

async runProgram(program?: RunProgramOptions | string) {
async runProgram(program?: RunProgramOptions | string): Promise<string | undefined> {
let programOptions: RunProgramOptions
const logger = getLogger('runProgram')

if (!this.runner) {
logger.error('No runner available')
return false
return
}

if (typeof program === 'object') {
Expand All @@ -1340,16 +1340,15 @@ export class Kernel implements Disposable {

this.context.subscriptions.push(programSession)

let execRes: string | undefined
const onData = (data: string | Uint8Array) => {
if (execRes === undefined) {
execRes = ''
}
execRes += data.toString()
const decoder = new TextDecoder()
let execRes = ''
const onData = (data: Uint8Array) => {
execRes += decoder.decode(data)
}

programSession.onDidWrite(onData)
programSession.onDidErr(onData)
// read raw directly from process instead of tty/pty
programSession.onStdoutRaw(onData)
programSession.onStderrRaw(onData)

const success = new Promise<boolean>((resolve, reject) => {
programSession.onDidClose(async (code) => {
Expand Down
75 changes: 56 additions & 19 deletions src/extension/runner/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
EventEmitter,
} from 'vscode'
import stripAnsi from 'strip-ansi'
import { Observable } from 'rxjs'
import { bufferTime, map, endWith, startWith } from 'rxjs/operators'

import type { DisposableAsync, Serializer } from '../../types'
import {
Expand Down Expand Up @@ -387,6 +389,7 @@ export default class GrpcRunner implements IRunner {
}
}

export const NON_TTY_BUFFER_SPAN_MS = 100
const log = getLogger('GrpcRunnerProgramSession')
export class GrpcRunnerProgramSession implements IRunnerProgramSession {
private disposables: Disposable[] = []
Expand Down Expand Up @@ -430,14 +433,45 @@ export class GrpcRunnerProgramSession implements IRunnerProgramSession {
) {
this.session = this.client.execute()

this.register(
this._onStdoutRaw.event((data) => {
// TODO: web compat
const stdout = Buffer.from(data).toString('utf-8')
this._onDidWrite.fire(stdout)
// unbufferd
const decoder = new TextDecoder()
let decodedStdout$: Observable<string> = new Observable<Uint8Array>((observer) => {
this.register(this._onStdoutRaw.event((bytes) => observer.next(bytes)))
this.register(
this._onDidClose.event(() => {
observer.complete()
this.dispose()
}),
)
this.register(
this._onInternalErr.event((err) => {
observer.error(err)
this.dispose()
}),
)
}).pipe(
map((bytes) => {
if (!this.isPseudoterminal()) {
return this.LFToCRLF(bytes)
}
return bytes
}),
map((bytes) => decoder.decode(bytes)),
)

// buffered
if (!this.isPseudoterminal()) {
decodedStdout$ = decodedStdout$.pipe(
startWith('\r'),
endWith('\r\n'),
bufferTime(NON_TTY_BUFFER_SPAN_MS),
map((chunks) => chunks.join('')),
)
}

const sub = decodedStdout$.subscribe((data) => this._onDidWrite.fire(data))
this.register({ dispose: () => sub.unsubscribe() })

this.register(
this._onStderrRaw.event((data) => {
// TODO: web compat
Expand All @@ -454,9 +488,6 @@ export class GrpcRunnerProgramSession implements IRunnerProgramSession {
}),
)

this.register(this._onDidClose.event(() => this.dispose()))
this.register(this._onInternalErr.event(() => this.dispose()))

let detectedMimeType = ''
this.session.responses.onMessage(({ stderrData, stdoutData, exitCode, pid, mimeType }) => {
if (mimeType) {
Expand Down Expand Up @@ -520,24 +551,30 @@ export class GrpcRunnerProgramSession implements IRunnerProgramSession {

protected write(channel: 'stdout' | 'stderr', mimeType: string, bytes: Uint8Array): void {
if (this.convertEol(mimeType) && !this.isPseudoterminal()) {
const newBytes = new Array(bytes.byteLength)
bytes = this.LFToCRLF(bytes)
}

let i = 0,
j = 0
while (j < bytes.byteLength) {
const byte = bytes[j++]
return this[GrpcRunnerProgramSession.WRITE_LISTENER[channel]].fire(bytes)
}

if (byte === 0x0a) {
newBytes[i++] = 0x0d
}
protected LFToCRLF(bytes: Uint8Array) {
const newBytes = new Array(bytes.byteLength)

newBytes[i++] = byte
let i = 0,
j = 0
while (j < bytes.byteLength) {
const byte = bytes[j++]

const prev = j - 1 >= 0 ? bytes[j - 1] : undefined
if (byte === 0x0a && prev !== 0x0d) {
newBytes[i++] = 0x0d
}

bytes = Buffer.from(newBytes)
newBytes[i++] = byte
}

return this[GrpcRunnerProgramSession.WRITE_LISTENER[channel]].fire(bytes)
bytes = Buffer.from(newBytes)
return bytes
}

protected async init(opts?: RunProgramOptions) {
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/specs/codelense.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('Runme Codelense Support', async () => {

const workbench = await browser.getWorkbench()
const text = await getTerminalText(workbench)
expect(text).toContain('Hello World!\n ')
expect(text).toContain('Hello World!\n\n *')
await killAllTerminals(workbench)
})

Expand Down
19 changes: 16 additions & 3 deletions tests/extension/runner/runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { GrpcRunnerClient } from '../../../src/extension/runner/client'
import GrpcRunner, {
GrpcRunnerProgramSession,
IRunner,
NON_TTY_BUFFER_SPAN_MS,
RunProgramOptions,
} from '../../../src/extension/runner'
import { GrpcRunnerEnvironment } from '../../../src/extension/runner/environment'
Expand Down Expand Up @@ -539,6 +540,7 @@ suite('grpc runner', () => {

test('onDidWrite replaces returns in non-interactive', async () => {
const { duplex, writeListener } = await createNewSession({
tty: false,
convertEol: true,
})

Expand All @@ -548,27 +550,33 @@ suite('grpc runner', () => {
mimeType: 'text/plain',
})

await waitForBufferTimespan()
expect(writeListener).toBeCalledTimes(1)
expect(writeListener).toBeCalledWith('test\r\n')
expect(writeListener).toBeCalledWith('\rtest\r\r\n')
})

test('onDidWrite replaces returns in complex string in non-interactive', async () => {
const { duplex, writeListener } = await createNewSession({
tty: false,
convertEol: true,
})

duplex._onMessage.fire({
stdoutData: Buffer.from('SERVICE_FOO_TOKEN: foobar\nSERVICE_BAR_TOKEN: barfoo'),
stdoutData: Buffer.from('SERVICE_FOO_TOKEN: foobar\r\nSERVICE_BAR_TOKEN: barfoo'),
stderrData: Buffer.from(''),
mimeType: 'text/plain',
})

await waitForBufferTimespan()
expect(writeListener).toBeCalledTimes(1)
expect(writeListener).toBeCalledWith('SERVICE_FOO_TOKEN: foobar\r\nSERVICE_BAR_TOKEN: barfoo')
expect(writeListener).toBeCalledWith(
'\rSERVICE_FOO_TOKEN: foobar\r\r\r\nSERVICE_BAR_TOKEN: barfoo',
)
})

test('onDidWrite replaces returns in non-interactive in stderr', async () => {
const { duplex, errListener } = await createNewSession({
tty: false,
convertEol: true,
})

Expand Down Expand Up @@ -784,6 +792,10 @@ suite('RunmeCodeLensProvider', () => {
})
})

function waitForBufferTimespan() {
return new Promise((resolve) => setTimeout(resolve, 2 * NON_TTY_BUFFER_SPAN_MS))
}

function getMockedDuplex(session: GrpcRunnerProgramSession): MockedDuplexClientStream {
return session['session'] as unknown as MockedDuplexClientStream
}
Expand Down Expand Up @@ -820,6 +832,7 @@ async function createNewSession(
runner ??= generatedRunner
const session = (await runner.createProgramSession({
programName: 'sh',
tty: true,
...options,
})) as GrpcRunnerProgramSession

Expand Down