Skip to content

Commit

Permalink
Ensure streamed outputs are added to the right cell (#16391)
Browse files Browse the repository at this point in the history
* Ensure streamed outputs are added to the right cell

* Fix linters

* Update comments

* Faster

* Fixes
  • Loading branch information
DonJayamanne authored Jan 21, 2025
1 parent f5fb0e0 commit 9cd4ddb
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 27 deletions.
10 changes: 5 additions & 5 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
"<node_internals>/**"
],
"env": {
// Disable this to turoff on redux & console logging during debugging
"VSC_JUPYTER_FORCE_LOGGING": "1",
// Enable this to console logging during debugging
"XVSC_JUPYTER_FORCE_LOGGING": "1",
// Enable this to log telemetry to the output during debugging
"XVSC_JUPYTER_LOG_TELEMETRY": "1",
// Enable this to log IPYWIDGET messages
Expand Down Expand Up @@ -191,7 +191,7 @@
"--extensionTestsPath=${workspaceFolder}/out/test/index.node.js"
],
"env": {
"VSC_JUPYTER_FORCE_LOGGING": "1",
"VSC_JUPYTER_FORCE_LOGGING": "",
"VSC_JUPYTER_CI_TEST_GREP": "", // Leave as `VSCode Notebook` to run only Notebook tests.
"VSC_JUPYTER_CI_TEST_INVERT_GREP": "", // Initialize this to invert the grep (exclude tests with value defined in grep).
"CI_PYTHON_PATH": "", // Update with path to real python interpereter used for testing.
Expand Down Expand Up @@ -231,7 +231,7 @@
"--extensionTestsPath=${workspaceFolder}/out/test/index.node.js"
],
"env": {
"VSC_JUPYTER_FORCE_LOGGING": "1",
"XVSC_JUPYTER_FORCE_LOGGING": "1",
"CI_PYTHON_PATH": "/usr/local/bin/python", // Update with path to real python interpereter used for testing.
"TEST_FILES_SUFFIX": "*.vscode.test,*.vscode.common.test",
"VSC_JUPYTER_REMOTE_NATIVE_TEST": "true", // Change to `true` to run the Native Notebook tests with remote jupyter connections.
Expand Down Expand Up @@ -266,7 +266,7 @@
],
"env": {
"VSC_JUPYTER_CI_TEST_GREP": "@nonPython",
"VSC_JUPYTER_FORCE_LOGGING": "1",
"XVSC_JUPYTER_FORCE_LOGGING": "1",
"VSC_JUPYTER_CI_RUN_NON_PYTHON_NB_TEST": "1",
"VSC_JUPYTER_CI_TEST_VSC_CHANNEL": "insiders",
"TEST_FILES_SUFFIX": "*.vscode.test,*.vscode.common.test",
Expand Down
51 changes: 29 additions & 22 deletions src/kernels/execution/cellExecutionMessageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,19 @@ export class CellExecutionMessageHandler implements IDisposable {
* Creating one results in side effects such as execution order getting reset and timers starting.
* Hence where possible re-use an existing cell execution task associated with this document.
*/
private createTemporaryTask() {
private createTemporaryTask(executionMustBelongToCurrentCell: boolean) {
if (this.cell.document.isClosed) {
return;
}
// If we have an active task, use that instead of creating a new task.
const existingTask = activeNotebookCellExecution.get(this.cell.notebook);
if (existingTask) {
return existingTask;
if (
!executionMustBelongToCurrentCell ||
(executionMustBelongToCurrentCell && existingTask.cell.index === this.cell.index)
) {
return existingTask;
}
}

// Create a temporary task.
Expand Down Expand Up @@ -645,11 +650,7 @@ export class CellExecutionMessageHandler implements IDisposable {
this.cell,
() => `Update output with mimes ${cellOutput.items.map((item) => item.mime).toString()}`
);
// Append to the data (we would push here but VS code requires a recreation of the array)
// Possible execution of cell has completed (the task would have been disposed).
// This message could have come from a background thread.
// In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output).
const task = this.execution || this.createTemporaryTask();
const task = this.getOrCreateExecutionTask(true);
// Clear if necessary
this.clearOutputIfNecessary(task);
// Keep track of the display_id against the output item, we might need this to update this later.
Expand Down Expand Up @@ -940,6 +941,21 @@ export class CellExecutionMessageHandler implements IDisposable {
private handleStatusMessage(msg: KernelMessage.IStatusMsg) {
traceCellMessage(this.cell, `Kernel switching to ${msg.content.execution_state}`);
}

/**
* Possible execution of cell has completed (the task would have been disposed).
* This message could have come from a background thread.
* In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output).
*/
private getOrCreateExecutionTask(executionMustBelongToCurrentCell: boolean) {
if (!executionMustBelongToCurrentCell && this.execution) {
return this.execution;
}
return this.execution?.cell === this.cell
? this.execution
: this.createTemporaryTask(executionMustBelongToCurrentCell);
}

private handleStreamMessage(msg: KernelMessage.IStreamMsg) {
if (
getParentHeaderMsgId(msg) &&
Expand All @@ -959,10 +975,7 @@ export class CellExecutionMessageHandler implements IDisposable {

// eslint-disable-next-line complexity
traceCellMessage(this.cell, `Update streamed output, new output '${msg.content.text.substring(0, 100)}'`);
// Possible execution of cell has completed (the task would have been disposed).
// This message could have come from a background thread.
// In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output).
const task = this.execution || this.createTemporaryTask();
const task = this.getOrCreateExecutionTask(true);

const outputName =
msg.content.name === 'stdout'
Expand All @@ -987,7 +1000,7 @@ export class CellExecutionMessageHandler implements IDisposable {

// Clear output if waiting for a clear
const { previousValueOfClearOutputOnNextUpdateToOutput } = this.clearOutputIfNecessary(task);
// Ensure we append to previous output, only if the streams as the same &
// Ensure we append to previous output, only if the streams are the same &
// If the last output is the desired stream type.
if (this.lastUsedStreamOutput?.stream === msg.content.name) {
const output = cellOutputToVSCCellOutput({
Expand Down Expand Up @@ -1048,7 +1061,7 @@ export class CellExecutionMessageHandler implements IDisposable {
].clearOutputOnNextUpdateToOutput = true;
}
} else {
const task = this.execution || this.createTemporaryTask();
const task = this.getOrCreateExecutionTask(true);
this.updateJupyterOutputWidgetWithOutput({ clearOutput: true }, task);
this.endTemporaryTask();
}
Expand All @@ -1059,11 +1072,7 @@ export class CellExecutionMessageHandler implements IDisposable {
if (msg.content.wait) {
this.clearOutputOnNextUpdateToOutput = true;
} else {
// Possible execution of cell has completed (the task would have been disposed).
// This message could have come from a background thread.
// In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output).
// Clear all outputs and start over again.
const task = this.execution || this.createTemporaryTask();
const task = this.getOrCreateExecutionTask(true);
this.clearLastUsedStreamOutput();
task?.clearOutput().then(noop, noop);
this.endTemporaryTask();
Expand Down Expand Up @@ -1173,10 +1182,6 @@ export class CellExecutionMessageHandler implements IDisposable {
return;
}
}
// Possible execution of cell has completed (the task would have been disposed).
// This message could have come from a background thread.
// In such circumstances, create a temporary task & use that to update the output (only cell execution tasks can update cell output).
const task = this.execution || this.createTemporaryTask();
traceCellMessage(this.cell, `Replace output items in display data ${newOutput.items.length}`);
if (outputMetadataHasChanged) {
// https://github.com/microsoft/vscode/issues/181369
Expand All @@ -1199,6 +1204,7 @@ export class CellExecutionMessageHandler implements IDisposable {
}
return o;
});
const task = this.getOrCreateExecutionTask(false);
task?.replaceOutput(newOutputs, outputToBeUpdated.cell).then(noop, noop);
CellOutputDisplayIdTracker.trackOutputByDisplayId(
outputToBeUpdated.cell,
Expand All @@ -1207,6 +1213,7 @@ export class CellExecutionMessageHandler implements IDisposable {
newOutput.items
);
} else {
const task = this.getOrCreateExecutionTask(false);
task?.replaceOutputItems(newOutput.items, outputToBeUpdated.outputContainer).then(noop, noop);
CellOutputDisplayIdTracker.trackOutputByDisplayId(
outputToBeUpdated.cell,
Expand Down
43 changes: 43 additions & 0 deletions src/test/datascience/notebook/executionService.vscode.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,49 @@ suite('Kernel Execution @kernelCore', function () {
await waitForTextOutput(cell2, 'HI Y', 0, false);
await waitForTextOutput(cell2, 'HI Z', 1, false);
});

test('Streamed output is added into the right cell (#16381)', async function () {
// https://github.com/microsoft/vscode-jupyter/issues/16381#issuecomment-2603496123
const onDidChangeNbEventHandler = new EventEmitter<NotebookDocumentChangeEvent>();
const stub = sinon.stub(workspace, 'onDidChangeNotebookDocument');
stub.get(() => onDidChangeNbEventHandler.event);
disposables.push(onDidChangeNbEventHandler);
const cell = await notebook.appendCodeCell(
dedent`
import logging.handlers
import queue, logging
import sys
que=queue.Queue()
handler = logging.StreamHandler(sys.stdout)
listener = logging.handlers.QueueListener(que, handler)
queue_handler = logging.handlers.QueueHandler(que)
root = logging.getLogger()
root.addHandler(queue_handler)
logging.lastResort=None
listener.start()
root.warning("Look out!")`
);
kernelExecution.executeCell(cell).catch(noop);

await Promise.all([waitForTextOutput(cell, 'Look out!', 0, false)]);

const cell2 = await notebook.appendCodeCell(
dedent`
root.setLevel(logging.DEBUG)
root.debug('debug test')`
);
kernelExecution.executeCell(cell2).catch(noop);

await Promise.all([waitForTextOutput(cell, 'debug test', 1, false)]);

assert.strictEqual(cell2.outputs.length, 0);

// Clear the loggers, else rest of the tests can fail..
const cell3 = await notebook.appendCodeCell(`root.loggers.clear()`);
await kernelExecution.executeCell(cell3).catch(noop);
});

test('Clearing output while executing will ensure output is cleared', async function () {
let onDidChangeNbEventHandler = new EventEmitter<NotebookDocumentChangeEvent>();
const stub = sinon.stub(workspace, 'onDidChangeNotebookDocument');
Expand Down

0 comments on commit 9cd4ddb

Please sign in to comment.