Skip to content

Commit

Permalink
Merge pull request #3337 from martinRenou/use_control_comm_target_7.x
Browse files Browse the repository at this point in the history
[Backport 7.x] Use control comm target in LabManager
  • Loading branch information
jasongrout authored Feb 11, 2022
2 parents 02ae460 + 0296e03 commit c419bfe
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 139 deletions.
84 changes: 2 additions & 82 deletions jupyterlab_widgets/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@ import * as Backbone from 'backbone';

import {
ManagerBase, shims, IClassicComm, IWidgetRegistryData, ExportMap,
ExportData, WidgetModel, WidgetView, put_buffers, serialize_state, IStateOptions
ExportData, WidgetModel, WidgetView, serialize_state, IStateOptions
} from '@jupyter-widgets/base';

import {
IDisposable
} from '@lumino/disposable';

import {
PromiseDelegate
} from '@lumino/coreutils';

import {
Widget
} from '@lumino/widgets';
Expand Down Expand Up @@ -220,74 +216,11 @@ class WidgetManager extends ManagerBase<Widget> implements IDisposable {
return;
}
await this.context.sessionContext.ready;
// TODO: when we upgrade to @jupyterlab/services 4.1 or later, we can
// remove this 'any' cast.
if (this.context.sessionContext.session?.kernel.handleComms === false) {
return;
}
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(Object.keys(comm_ids).map(async (comm_id) => {
try {
await this.get_model(comm_id);
// If we successfully get the model, do no more.
return;
} catch (e) {
// If we have the widget model not found error, then we can create the
// widget. Otherwise, rethrow the error. We have to check the error
// message text explicitly because the get_model function in this
// class throws a generic error with this specific text.
if (e.message !== 'widget model not found') {
throw e;
}
const comm = await this._create_comm(this.comm_target_name, comm_id);

let msg_id: string;
const info = new PromiseDelegate<Private.ICommUpdateData>();
comm.on_msg((msg: KernelMessage.ICommMsgMsg) => {
if ((msg.parent_header as any).msg_id === msg_id
&& msg.header.msg_type === 'comm_msg'
&& msg.content.data.method === 'update') {
let data = (msg.content.data as any);
let buffer_paths = data.buffer_paths || [];
// Make sure the buffers are DataViews
let buffers = (msg.buffers || []).map(b => {
if (b instanceof DataView) {
return b;
} else {
return new DataView(b instanceof ArrayBuffer ? b : b.buffer);
}
});
put_buffers(data.state, buffer_paths, buffers);
info.resolve({comm, msg});
}
});
msg_id = comm.send({
method: 'request_state'
}, this.callbacks(undefined));

return info.promise;
}
}));

// We put in a synchronization barrier here so that we don't have to
// topologically sort the restored widgets. `new_model` synchronously
// registers the widget ids before reconstructing their state
// asynchronously, so promises to every widget reference should be available
// by the time they are used.
await Promise.all(widgets_info.map(async widget_info => {
if (!widget_info) {
return;
}
const content = widget_info.msg.content as any;
await this.new_model({
model_name: content.data.state._model_name,
model_module: content.data.state._model_module,
model_module_version: content.data.state._model_module_version,
comm: widget_info.comm,
}, content.data.state);
}));
return super._loadFromKernel();
}


Expand Down Expand Up @@ -538,16 +471,3 @@ namespace WidgetManager {
saveState: boolean
};
}


namespace Private {

/**
* Data promised when a comm info request resolves.
*/
export
interface ICommUpdateData {
comm: IClassicComm;
msg: KernelMessage.ICommMsgMsg;
}
}
258 changes: 258 additions & 0 deletions packages/base/src/manager-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import * as utils from './utils';
import * as services from '@jupyterlab/services';

import {
PromiseDelegate,
} from '@lumino/coreutils';

import {
DOMWidgetView, WidgetModel, WidgetView, DOMWidgetModel
} from './widget';
Expand All @@ -18,6 +22,21 @@ import {

const PROTOCOL_MAJOR_VERSION = PROTOCOL_VERSION.split('.', 1)[0];

/**
* The control comm target name.
*/
export const CONTROL_COMM_TARGET = 'jupyter.widget.control';

/**
* The supported version for the control comm channel.
*/
export const CONTROL_COMM_PROTOCOL_VERSION = '1.0.0';

/**
* Time (in ms) after which we consider the control comm target not responding.
*/
export const CONTROL_COMM_TIMEOUT = 4000;

/**
* The options for a model.
*
Expand Down Expand Up @@ -361,7 +380,236 @@ abstract class ManagerBase<T> {
widget_model.name = options.model_name;
widget_model.module = options.model_module;
return widget_model;
}

/**
* Fetch all widgets states from the kernel using the control comm channel
* If this fails (control comm handler not implemented kernel side),
* it will fall back to `_loadFromKernelModels`.
*
* This is a utility function that can be used in subclasses.
*/
protected async _loadFromKernel(): Promise<void> {
// Try fetching all widget states through the control comm
let data: any;
let buffers: any;
try {
const initComm = await this._create_comm(
CONTROL_COMM_TARGET,
utils.uuid(),
{},
{ version: CONTROL_COMM_PROTOCOL_VERSION }
);

await new Promise((resolve, reject) => {
initComm.on_msg((msg: any) => {
data = msg['content']['data'];

if (data.method !== 'update_states') {
console.warn(`
Unknown ${data.method} message on the Control channel
`);
return;
}

buffers = (msg.buffers || []).map((b: any) => {
if (b instanceof DataView) {
return b;
} else {
return new DataView(b instanceof ArrayBuffer ? b : b.buffer);
}
});

resolve(null);
});

initComm.on_close(() => reject('Control comm was closed too early'));

// Send a states request msg
initComm.send({ method: 'request_states' }, {});

// Reject if we didn't get a response in time
setTimeout(
() => reject('Control comm did not respond in time'),
CONTROL_COMM_TIMEOUT
);
});

initComm.close();
} catch (error) {
console.warn(
'Failed to fetch ipywidgets through the "jupyter.widget.control" comm channel, fallback to fetching individual model state. Reason:',
error
);
// Fall back to the old implementation for old ipywidgets backend versions (ipywidgets<=7.6)
return this._loadFromKernelModels();
}

const states: any = data.states;
const bufferPaths: any = {};
const bufferGroups: any = {};

// Group buffers and buffer paths by widget id
for (let i = 0; i < data.buffer_paths.length; i++) {
const [widget_id, ...path] = data.buffer_paths[i];
const b = buffers[i];
if (!bufferPaths[widget_id]) {
bufferPaths[widget_id] = [];
bufferGroups[widget_id] = [];
}
bufferPaths[widget_id].push(path);
bufferGroups[widget_id].push(b);
}

// Create comms for all new widgets.
let widget_comms = await Promise.all(
Object.keys(states).map(async (widget_id) => {
let comm = undefined;
let modelPromise = undefined;
try {
modelPromise = this.get_model(widget_id);
if (modelPromise === undefined) {
comm = await this._create_comm('jupyter.widget', widget_id);
} else {
// For JLab, the promise is rejected, so we have to await to
// find out if it is actually a model.
await modelPromise;
}
} catch (e) {
// The JLab widget manager will throw an error with this specific error message.
if (e.message !== 'widget model not found') {
throw e;
}
comm = await this._create_comm('jupyter.widget', widget_id);
}
return {widget_id, comm}
})
)

await Promise.all(widget_comms.map(async ({widget_id, comm}) => {
const state = states[widget_id];
// Put binary buffers
if (widget_id in bufferPaths) {
utils.put_buffers(
state,
bufferPaths[widget_id],
bufferGroups[widget_id]
);
}
try {

if (comm === undefined) {
// model already exists here
const model = await this.get_model(widget_id);
model!.set_state(state.state);
} else {
// This must be the first await in the code path that
// reaches here so that registering the model promise in
// new_model can register the widget promise before it may
// be required by other widgets.
await this.new_model(
{
model_name: state.model_name,
model_module: state.model_module,
model_module_version: state.model_module_version,
model_id: widget_id,
comm: comm,
},
state.state
);
}

} catch (error) {
// Failed to create a widget model, we continue creating other models so that
// other widgets can render
console.error(error);
}
}));
}

/**
* Old implementation of fetching widget models one by one using
* the request_state message on each comm.
*
* This is a utility function that can be used in subclasses.
*/
protected async _loadFromKernelModels(): Promise<void> {
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(
Object.keys(comm_ids).map(async (comm_id) => {
try {
const model = this.get_model(comm_id);
// TODO Have the same this.get_model implementation for
// the widgetsnbextension and labextension, the one that
// throws an error if the model is not found instead of
// returning undefined
if (model === undefined) {
throw new Error('widget model not found');
}
await model;
// If we successfully get the model, do no more.
return;
} catch (e) {
// If we have the widget model not found error, then we can create the
// widget. Otherwise, rethrow the error. We have to check the error
// message text explicitly because the get_model function in this
// class throws a generic error with this specific text.
if (e.message !== 'widget model not found') {
throw e;
}
const comm = await this._create_comm(this.comm_target_name, comm_id);

let msg_id = '';
const info = new PromiseDelegate<Private.ICommUpdateData>();
comm.on_msg((msg) => {
if (
(msg.parent_header as any).msg_id === msg_id &&
msg.header.msg_type === 'comm_msg' &&
msg.content.data.method === 'update'
) {
const data = msg.content.data as any;
const buffer_paths = data.buffer_paths || [];
const buffers = msg.buffers || [];
utils.put_buffers(data.state, buffer_paths, buffers);
info.resolve({ comm, msg });
}
});
msg_id = comm.send(
{
method: 'request_state',
},
this.callbacks(undefined)
);

return info.promise;
}
})
);

// We put in a synchronization barrier here so that we don't have to
// topologically sort the restored widgets. `new_model` synchronously
// registers the widget ids before reconstructing their state
// asynchronously, so promises to every widget reference should be available
// by the time they are used.
await Promise.all(
widgets_info.map(async (widget_info) => {
if (!widget_info) {
return;
}
const content = widget_info.msg.content as any;
await this.new_model(
{
model_name: content.data.state._model_name,
model_module: content.data.state._model_module,
model_module_version: content.data.state._model_module_version,
comm: widget_info.comm,
},
content.data.state
);
})
);
}

/**
Expand Down Expand Up @@ -586,3 +834,13 @@ function serialize_state(models: WidgetModel[], options: IStateOptions = {}) {
});
return {version_major: 2, version_minor: 0, state: state};
}

namespace Private {
/**
* Data promised when a comm info request resolves.
*/
export interface ICommUpdateData {
comm: IClassicComm;
msg: services.KernelMessage.ICommMsgMsg;
}
}
Loading

0 comments on commit c419bfe

Please sign in to comment.