2626import os
2727import signal
2828import threading
29+ import traceback
2930from collections import defaultdict
3031from collections .abc import Callable , Generator
3132from dataclasses import dataclass , field
5455 AnnounceMsg ,
5556 AnnounceValue ,
5657 ErrorMsg ,
58+ ErrorValue ,
5759 EventMsg ,
5860 IdentifyMsg ,
5961 IdentifyValue ,
@@ -563,8 +565,20 @@ def on_stop(self, msg: StopMsg) -> None:
563565 os .kill (pid , signal .SIGTERM )
564566
565567 def error (self , err : Exception ) -> None :
566- self .logger .debug ("Throwing error in main runner: %s" , err )
567- msg = ErrorMsg (node_id = self .spec .id , value = err )
568+ """
569+ Capture the error and traceback context from an exception using
570+ :class:`traceback.TracebackException` and send to command node to re-raise
571+ """
572+ tbexception = "\n " .join (traceback .format_tb (err .__traceback__ ))
573+ self .logger .debug ("Throwing error in main runner: %s" , tbexception )
574+ msg = ErrorMsg (
575+ node_id = self .spec .id ,
576+ value = ErrorValue (
577+ err_type = type (err ),
578+ err_args = err .args ,
579+ traceback = tbexception ,
580+ ),
581+ )
568582 self ._dealer .send_multipart ([msg .to_bytes ()])
569583
570584
@@ -583,7 +597,7 @@ class ZMQRunner(TubeRunner):
583597 _running : EventType = field (default_factory = mp .Event )
584598 _return_node : Return | None = None
585599 _init_lock : threading .Lock = field (default_factory = threading .Lock )
586- _to_throw : Exception | None = None
600+ _to_throw : ErrorValue | None = None
587601
588602 @property
589603 def running (self ) -> bool :
@@ -708,14 +722,25 @@ def _handle_error(self, msg: ErrorMsg) -> None:
708722 self .tube .scheduler .end_epoch (self ._current_epoch )
709723
710724 def _throw_error (self ) -> None :
711- err = self ._to_throw
712- if err is None :
725+ errval = self ._to_throw
726+ if errval is None :
713727 return
728+ # clear instance object and store locally, we aren't locked here.
714729 self ._to_throw = None
715730 self ._logger .debug (
716731 "Deinitializing before throwing error" ,
717732 )
718733 self .deinit ()
734+
735+ # add the traceback as a note,
736+ # sort of the best we can do without using tblib
737+ err = errval ["err_type" ](* errval ["err_args" ])
738+ tb_message = "\n Error re-raised from node runner process\n \n "
739+ tb_message += "Original traceback:\n "
740+ tb_message += "-" * 20 + "\n "
741+ tb_message += errval ["traceback" ]
742+ err .add_note (tb_message )
743+
719744 raise err
720745
721746 def enable_node (self , node_id : str ) -> None :
0 commit comments