Skip to content

Commit 796a246

Browse files
committed
Consolidate duplicated exec/eval routes and init logic
1 parent f4cfa69 commit 796a246

File tree

2 files changed

+75
-116
lines changed

2 files changed

+75
-116
lines changed

pathview_server/app.py

Lines changed: 24 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -337,15 +337,16 @@ def api_init():
337337
except Exception as e:
338338
return jsonify({"type": "error", "error": str(e)}), 500
339339

340-
@app.route("/api/exec", methods=["POST"])
341-
def api_exec():
342-
"""Execute Python code in the session's worker."""
340+
def _handle_worker_request(msg: dict, success_type: str) -> tuple:
341+
"""Send a message to the worker and collect the response.
342+
343+
Shared by api_exec (success_type="ok") and api_eval (success_type="value").
344+
Returns a Flask response tuple.
345+
"""
343346
session_id = _get_session_id()
344347
if not session_id:
345348
return jsonify({"type": "error", "error": "Missing X-Session-ID header"}), 400
346-
data = request.get_json(force=True)
347-
code = data.get("code", "")
348-
msg_id = data.get("id", str(uuid.uuid4()))
349+
msg_id = msg.get("id", str(uuid.uuid4()))
349350

350351
session = get_or_create_session(session_id)
351352
with session.lock:
@@ -354,7 +355,7 @@ def api_exec():
354355
# reading stdout — prevents concurrent pipe reads / JSONDecodeError
355356
session.wait_for_stream_reader()
356357
session.ensure_initialized()
357-
session.send_message({"type": "exec", "id": msg_id, "code": code})
358+
session.send_message(msg)
358359

359360
stdout_lines = []
360361
stderr_lines = []
@@ -368,8 +369,8 @@ def api_exec():
368369
stdout_lines.append(resp.get("value", ""))
369370
elif resp_type == "stderr":
370371
stderr_lines.append(resp.get("value", ""))
371-
elif resp_type == "ok" and resp.get("id") == msg_id:
372-
result = {"type": "ok", "id": msg_id}
372+
elif resp_type == success_type and resp.get("id") == msg_id:
373+
result = resp
373374
if stdout_lines:
374375
result["stdout"] = "".join(stdout_lines)
375376
if stderr_lines:
@@ -389,57 +390,25 @@ def api_exec():
389390
except Exception as e:
390391
return jsonify({"type": "error", "id": msg_id, "error": str(e)}), 500
391392

393+
@app.route("/api/exec", methods=["POST"])
394+
def api_exec():
395+
"""Execute Python code in the session's worker."""
396+
data = request.get_json(force=True)
397+
msg_id = data.get("id", str(uuid.uuid4()))
398+
return _handle_worker_request(
399+
{"type": "exec", "id": msg_id, "code": data.get("code", "")},
400+
success_type="ok",
401+
)
402+
392403
@app.route("/api/eval", methods=["POST"])
393404
def api_eval():
394405
"""Evaluate a Python expression in the session's worker."""
395-
session_id = _get_session_id()
396-
if not session_id:
397-
return jsonify({"type": "error", "error": "Missing X-Session-ID header"}), 400
398406
data = request.get_json(force=True)
399-
expr = data.get("expr", "")
400407
msg_id = data.get("id", str(uuid.uuid4()))
401-
402-
session = get_or_create_session(session_id)
403-
with session.lock:
404-
try:
405-
# Wait for any lingering stream reader thread to exit before
406-
# reading stdout — prevents concurrent pipe reads / JSONDecodeError
407-
session.wait_for_stream_reader()
408-
session.ensure_initialized()
409-
session.send_message({"type": "eval", "id": msg_id, "expr": expr})
410-
411-
stdout_lines = []
412-
stderr_lines = []
413-
while True:
414-
resp = session.read_line_timeout()
415-
if resp is None:
416-
remove_session(session_id)
417-
return jsonify({"type": "error", "errorType": "worker-crashed", "id": msg_id, "error": "Worker process died"}), 500
418-
resp_type = resp.get("type")
419-
if resp_type == "stdout":
420-
stdout_lines.append(resp.get("value", ""))
421-
elif resp_type == "stderr":
422-
stderr_lines.append(resp.get("value", ""))
423-
elif resp_type == "value" and resp.get("id") == msg_id:
424-
result = resp
425-
if stdout_lines:
426-
result["stdout"] = "".join(stdout_lines)
427-
if stderr_lines:
428-
result["stderr"] = "".join(stderr_lines)
429-
return jsonify(result)
430-
elif resp_type == "error" and resp.get("id") == msg_id:
431-
result = resp
432-
if stdout_lines:
433-
result["stdout"] = "".join(stdout_lines)
434-
if stderr_lines:
435-
result["stderr"] = "".join(stderr_lines)
436-
return jsonify(result), 400
437-
438-
except TimeoutError:
439-
remove_session(session_id)
440-
return jsonify({"type": "error", "errorType": "timeout", "id": msg_id, "error": "Execution timed out"}), 504
441-
except Exception as e:
442-
return jsonify({"type": "error", "id": msg_id, "error": str(e)}), 500
408+
return _handle_worker_request(
409+
{"type": "eval", "id": msg_id, "expr": data.get("expr", "")},
410+
success_type="value",
411+
)
443412

444413
@app.route("/api/stream/start", methods=["POST"])
445414
def api_stream_start():

src/lib/pyodide/backend/flask/backend.ts

Lines changed: 51 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ import { TIMEOUTS } from '$lib/constants/python';
1313
import { STATUS_MESSAGES } from '$lib/constants/messages';
1414
import { PYTHON_PACKAGES } from '$lib/constants/dependencies';
1515

16-
/** Polling interval for stream results (ms) */
17-
const STREAM_POLL_INTERVAL = 30;
16+
/** Delay between polls (ms). The server uses long-polling (blocks up
17+
* to 100 ms until data arrives), so data delivery is near-instant.
18+
* This interval is just a safety gap between consecutive requests. */
19+
const STREAM_POLL_INTERVAL = 5;
1820

1921
/** BroadcastChannel name for cross-tab session coordination */
2022
const SESSION_CHANNEL = 'flask-session';
@@ -86,28 +88,7 @@ export class FlaskBackend implements Backend {
8688

8789
backendState.update((s) => ({ ...s, progress: 'Initializing Python worker...' }));
8890

89-
const initResp = await fetch(`${this.host}/api/init`, {
90-
method: 'POST',
91-
headers: {
92-
'Content-Type': 'application/json',
93-
'X-Session-ID': this.sessionId
94-
},
95-
body: JSON.stringify({ packages: PYTHON_PACKAGES }),
96-
signal: AbortSignal.timeout(TIMEOUTS.INIT)
97-
});
98-
const initData = await initResp.json();
99-
100-
if (initData.type === 'error') throw new Error(initData.error);
101-
102-
if (initData.messages) {
103-
for (const msg of initData.messages) {
104-
if (msg.type === 'stdout' && this.stdoutCallback) this.stdoutCallback(msg.value);
105-
if (msg.type === 'stderr' && this.stderrCallback) this.stderrCallback(msg.value);
106-
if (msg.type === 'progress') {
107-
backendState.update((s) => ({ ...s, progress: msg.value }));
108-
}
109-
}
110-
}
91+
await this.postInit({ updateProgress: true });
11192

11293
this.serverInitPromise = Promise.resolve();
11394

@@ -126,6 +107,10 @@ export class FlaskBackend implements Backend {
126107

127108
terminate(): void {
128109
this.stopStreaming();
110+
if (this.streamPollTimer) {
111+
clearTimeout(this.streamPollTimer);
112+
this.streamPollTimer = null;
113+
}
129114
this._isStreaming = false;
130115
this.streamState = { onData: null, onDone: null, onError: null };
131116

@@ -172,25 +157,7 @@ export class FlaskBackend implements Backend {
172157
private ensureServerInit(): Promise<void> {
173158
if (this.serverInitPromise) return this.serverInitPromise;
174159

175-
this.serverInitPromise = (async () => {
176-
const resp = await fetch(`${this.host}/api/init`, {
177-
method: 'POST',
178-
headers: {
179-
'Content-Type': 'application/json',
180-
'X-Session-ID': this.sessionId
181-
},
182-
body: JSON.stringify({ packages: PYTHON_PACKAGES }),
183-
signal: AbortSignal.timeout(TIMEOUTS.INIT)
184-
});
185-
const data = await resp.json();
186-
if (data.type === 'error') throw new Error(data.error);
187-
if (data.messages) {
188-
for (const msg of data.messages) {
189-
if (msg.type === 'stdout' && this.stdoutCallback) this.stdoutCallback(msg.value);
190-
if (msg.type === 'stderr' && this.stderrCallback) this.stderrCallback(msg.value);
191-
}
192-
}
193-
})();
160+
this.serverInitPromise = this.postInit({ updateProgress: false });
194161

195162
// Clear on failure so subsequent calls retry instead of returning the rejected promise
196163
this.serverInitPromise.catch(() => {
@@ -271,6 +238,13 @@ export class FlaskBackend implements Backend {
271238
this.stopStreaming();
272239
}
273240

241+
// Clear any lingering poll timer from a previous stream so it
242+
// doesn't pick up a stale stream-done and fire the NEW onDone.
243+
if (this.streamPollTimer) {
244+
clearTimeout(this.streamPollTimer);
245+
this.streamPollTimer = null;
246+
}
247+
274248
const id = this.generateId();
275249
this._isStreaming = true;
276250
this.streamState = {
@@ -296,7 +270,6 @@ export class FlaskBackend implements Backend {
296270
if (data.type === 'error') {
297271
throw new Error(data.error);
298272
}
299-
// Start polling loop — same as Pyodide worker's onmessage dispatching
300273
this.pollStreamResults();
301274
})
302275
.catch((error) => {
@@ -309,29 +282,19 @@ export class FlaskBackend implements Backend {
309282
stopStreaming(): void {
310283
if (!this._isStreaming) return;
311284

312-
// Stop polling timer — the server will send stream-done which triggers onDone
313-
if (this.streamPollTimer) {
314-
clearTimeout(this.streamPollTimer);
315-
this.streamPollTimer = null;
316-
}
317-
318-
// Tell server to stop, then do one final poll to get the stream-done message
285+
// Just send the stop signal — don't disrupt the polling loop.
286+
// The poll loop will naturally pick up stream-done and clean up,
287+
// matching how Pyodide's stopStreaming just sends stream-stop and
288+
// lets worker.onmessage handle the rest.
319289
fetch(`${this.host}/api/stream/stop`, {
320290
method: 'POST',
321291
headers: {
322292
'Content-Type': 'application/json',
323293
'X-Session-ID': this.sessionId
324294
}
325-
})
326-
.then(() => this.pollStreamResults())
327-
.catch(() => {
328-
// If final poll fails, clean up locally
329-
this._isStreaming = false;
330-
if (this.streamState.onDone) {
331-
this.streamState.onDone();
332-
}
333-
this.streamState = { onData: null, onDone: null, onError: null };
334-
});
295+
}).catch(() => {
296+
// Network failure — poll loop will also fail and clean up
297+
});
335298
}
336299

337300
isStreaming(): boolean {
@@ -374,6 +337,33 @@ export class FlaskBackend implements Backend {
374337
return `repl_${++this.messageId}`;
375338
}
376339

340+
/**
341+
* POST /api/init with packages and forward worker messages to callbacks.
342+
* Shared by init() (first load with progress UI) and ensureServerInit() (lazy re-init).
343+
*/
344+
private async postInit(opts: { updateProgress: boolean }): Promise<void> {
345+
const resp = await fetch(`${this.host}/api/init`, {
346+
method: 'POST',
347+
headers: {
348+
'Content-Type': 'application/json',
349+
'X-Session-ID': this.sessionId
350+
},
351+
body: JSON.stringify({ packages: PYTHON_PACKAGES }),
352+
signal: AbortSignal.timeout(TIMEOUTS.INIT)
353+
});
354+
const data = await resp.json();
355+
if (data.type === 'error') throw new Error(data.error);
356+
if (data.messages) {
357+
for (const msg of data.messages) {
358+
if (msg.type === 'stdout' && this.stdoutCallback) this.stdoutCallback(msg.value);
359+
if (msg.type === 'stderr' && this.stderrCallback) this.stderrCallback(msg.value);
360+
if (msg.type === 'progress' && opts.updateProgress) {
361+
backendState.update((s) => ({ ...s, progress: msg.value }));
362+
}
363+
}
364+
}
365+
}
366+
377367
/**
378368
* Check if a response indicates the worker crashed or timed out.
379369
* If so, clear serverInitPromise so the next request triggers re-init.

0 commit comments

Comments
 (0)