Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure streamed outputs are added to the right cell #16391

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
"<node_internals>/**"
],
"env": {
// Disable this to turoff on redux & console logging during debugging
"VSC_JUPYTER_FORCE_LOGGING": "1",
// Enable this to console logging during debugging
"XVSC_JUPYTER_FORCE_LOGGING": "1",
// Enable this to log telemetry to the output during debugging
"XVSC_JUPYTER_LOG_TELEMETRY": "1",
// Enable this to log IPYWIDGET messages
Expand Down Expand Up @@ -191,7 +191,7 @@
"--extensionTestsPath=${workspaceFolder}/out/test/index.node.js"
],
"env": {
"VSC_JUPYTER_FORCE_LOGGING": "1",
"VSC_JUPYTER_FORCE_LOGGING": "",
"VSC_JUPYTER_CI_TEST_GREP": "", // Leave as `VSCode Notebook` to run only Notebook tests.
"VSC_JUPYTER_CI_TEST_INVERT_GREP": "", // Initialize this to invert the grep (exclude tests with value defined in grep).
"CI_PYTHON_PATH": "", // Update with path to real python interpereter used for testing.
Expand Down Expand Up @@ -231,7 +231,7 @@
"--extensionTestsPath=${workspaceFolder}/out/test/index.node.js"
],
"env": {
"VSC_JUPYTER_FORCE_LOGGING": "1",
"XVSC_JUPYTER_FORCE_LOGGING": "1",
"CI_PYTHON_PATH": "/usr/local/bin/python", // Update with path to real python interpereter used for testing.
"TEST_FILES_SUFFIX": "*.vscode.test,*.vscode.common.test",
"VSC_JUPYTER_REMOTE_NATIVE_TEST": "true", // Change to `true` to run the Native Notebook tests with remote jupyter connections.
Expand Down Expand Up @@ -266,7 +266,7 @@
],
"env": {
"VSC_JUPYTER_CI_TEST_GREP": "@nonPython",
"VSC_JUPYTER_FORCE_LOGGING": "1",
"XVSC_JUPYTER_FORCE_LOGGING": "1",
"VSC_JUPYTER_CI_RUN_NON_PYTHON_NB_TEST": "1",
"VSC_JUPYTER_CI_TEST_VSC_CHANNEL": "insiders",
"TEST_FILES_SUFFIX": "*.vscode.test,*.vscode.common.test",
Expand Down
51 changes: 29 additions & 22 deletions src/kernels/execution/cellExecutionMessageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,19 @@ export class CellExecutionMessageHandler implements IDisposable {
* Creating one results in side effects such as execution order getting reset and timers starting.
* Hence where possible re-use an existing cell execution task associated with this document.
*/
private createTemporaryTask() {
private createTemporaryTask(executionMustBelongToCurrentCell: boolean) {
if (this.cell.document.isClosed) {
return;
}
// If we have an active task, use that instead of creating a new task.
const existingTask = activeNotebookCellExecution.get(this.cell.notebook);
if (existingTask) {
return existingTask;
if (
!executionMustBelongToCurrentCell ||
(executionMustBelongToCurrentCell && existingTask.cell.index === this.cell.index)
) {
return existingTask;
}
}

// Create a temporary task.
Expand Down Expand Up @@ -645,11 +650,7 @@ export class CellExecutionMessageHandler implements IDisposable {
this.cell,
() => `Update output with mimes ${cellOutput.items.map((item) => item.mime).toString()}`
);
// Append to the data (we would push here but VS code requires a recreation of the array)
// Possible execution of cell has completed (the task would have been disposed).
// This message could have come from a background thread.
// In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output).
const task = this.execution || this.createTemporaryTask();
const task = this.getOrCreateExecutionTask(true);
// Clear if necessary
this.clearOutputIfNecessary(task);
// Keep track of the display_id against the output item, we might need this to update this later.
Expand Down Expand Up @@ -940,6 +941,21 @@ export class CellExecutionMessageHandler implements IDisposable {
private handleStatusMessage(msg: KernelMessage.IStatusMsg) {
traceCellMessage(this.cell, `Kernel switching to ${msg.content.execution_state}`);
}

/**
* Possible execution of cell has completed (the task would have been disposed).
* This message could have come from a background thread.
* In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output).
*/
private getOrCreateExecutionTask(executionMustBelongToCurrentCell: boolean) {
if (!executionMustBelongToCurrentCell && this.execution) {
return this.execution;
}
return this.execution?.cell === this.cell
? this.execution
: this.createTemporaryTask(executionMustBelongToCurrentCell);
}

private handleStreamMessage(msg: KernelMessage.IStreamMsg) {
if (
getParentHeaderMsgId(msg) &&
Expand All @@ -959,10 +975,7 @@ export class CellExecutionMessageHandler implements IDisposable {

// eslint-disable-next-line complexity
traceCellMessage(this.cell, `Update streamed output, new output '${msg.content.text.substring(0, 100)}'`);
// Possible execution of cell has completed (the task would have been disposed).
// This message could have come from a background thread.
// In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output).
const task = this.execution || this.createTemporaryTask();
const task = this.getOrCreateExecutionTask(true);

const outputName =
msg.content.name === 'stdout'
Expand All @@ -987,7 +1000,7 @@ export class CellExecutionMessageHandler implements IDisposable {

// Clear output if waiting for a clear
const { previousValueOfClearOutputOnNextUpdateToOutput } = this.clearOutputIfNecessary(task);
// Ensure we append to previous output, only if the streams as the same &
// Ensure we append to previous output, only if the streams are the same &
// If the last output is the desired stream type.
if (this.lastUsedStreamOutput?.stream === msg.content.name) {
const output = cellOutputToVSCCellOutput({
Expand Down Expand Up @@ -1048,7 +1061,7 @@ export class CellExecutionMessageHandler implements IDisposable {
].clearOutputOnNextUpdateToOutput = true;
}
} else {
const task = this.execution || this.createTemporaryTask();
const task = this.getOrCreateExecutionTask(true);
this.updateJupyterOutputWidgetWithOutput({ clearOutput: true }, task);
this.endTemporaryTask();
}
Expand All @@ -1059,11 +1072,7 @@ export class CellExecutionMessageHandler implements IDisposable {
if (msg.content.wait) {
this.clearOutputOnNextUpdateToOutput = true;
} else {
// Possible execution of cell has completed (the task would have been disposed).
// This message could have come from a background thread.
// In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output).
// Clear all outputs and start over again.
const task = this.execution || this.createTemporaryTask();
const task = this.getOrCreateExecutionTask(true);
this.clearLastUsedStreamOutput();
task?.clearOutput().then(noop, noop);
this.endTemporaryTask();
Expand Down Expand Up @@ -1173,10 +1182,6 @@ export class CellExecutionMessageHandler implements IDisposable {
return;
}
}
// Possible execution of cell has completed (the task would have been disposed).
// This message could have come from a background thread.
// In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output).
const task = this.execution || this.createTemporaryTask();
traceCellMessage(this.cell, `Replace output items in display data ${newOutput.items.length}`);
if (outputMetadataHasChanged) {
// https://github.com/microsoft/vscode/issues/181369
Expand All @@ -1199,6 +1204,7 @@ export class CellExecutionMessageHandler implements IDisposable {
}
return o;
});
const task = this.getOrCreateExecutionTask(false);
task?.replaceOutput(newOutputs, outputToBeUpdated.cell).then(noop, noop);
CellOutputDisplayIdTracker.trackOutputByDisplayId(
outputToBeUpdated.cell,
Expand All @@ -1207,6 +1213,7 @@ export class CellExecutionMessageHandler implements IDisposable {
newOutput.items
);
} else {
const task = this.getOrCreateExecutionTask(false);
task?.replaceOutputItems(newOutput.items, outputToBeUpdated.outputContainer).then(noop, noop);
CellOutputDisplayIdTracker.trackOutputByDisplayId(
outputToBeUpdated.cell,
Expand Down
43 changes: 43 additions & 0 deletions src/test/datascience/notebook/executionService.vscode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,49 @@ suite('Kernel Execution @kernelCore', function () {
await waitForTextOutput(cell2, 'HI Y', 0, false);
await waitForTextOutput(cell2, 'HI Z', 1, false);
});

test('Streamed output is added into the right cell (#16381)', async function () {
// https://github.com/microsoft/vscode-jupyter/issues/16381#issuecomment-2603496123
const onDidChangeNbEventHandler = new EventEmitter<NotebookDocumentChangeEvent>();
const stub = sinon.stub(workspace, 'onDidChangeNotebookDocument');
stub.get(() => onDidChangeNbEventHandler.event);
disposables.push(onDidChangeNbEventHandler);
const cell = await notebook.appendCodeCell(
dedent`
import logging.handlers
import queue, logging
import sys
que=queue.Queue()
handler = logging.StreamHandler(sys.stdout)
listener = logging.handlers.QueueListener(que, handler)
queue_handler = logging.handlers.QueueHandler(que)
root = logging.getLogger()
root.addHandler(queue_handler)
logging.lastResort=None
listener.start()

root.warning("Look out!")`
);
kernelExecution.executeCell(cell).catch(noop);

await Promise.all([waitForTextOutput(cell, 'Look out!', 0, false)]);

const cell2 = await notebook.appendCodeCell(
dedent`
root.setLevel(logging.DEBUG)
root.debug('debug test')`
);
kernelExecution.executeCell(cell2).catch(noop);

await Promise.all([waitForTextOutput(cell, 'debug test', 1, false)]);

assert.strictEqual(cell2.outputs.length, 0);

// Clear the loggers, else rest of the tests can fail..
const cell3 = await notebook.appendCodeCell(`root.loggers.clear()`);
await kernelExecution.executeCell(cell3).catch(noop);
});

test('Clearing output while executing will ensure output is cleared', async function () {
let onDidChangeNbEventHandler = new EventEmitter<NotebookDocumentChangeEvent>();
const stub = sinon.stub(workspace, 'onDidChangeNotebookDocument');
Expand Down
Loading