diff --git a/python/packages/core/agent_framework/_workflows/_agent.py b/python/packages/core/agent_framework/_workflows/_agent.py index e1ae5302d1..b6debe4eb9 100644 --- a/python/packages/core/agent_framework/_workflows/_agent.py +++ b/python/packages/core/agent_framework/_workflows/_agent.py @@ -196,7 +196,7 @@ async def _run_stream_impl( """ # Determine the event stream based on whether we have function responses if bool(self.pending_requests): - # This is a continuation - use send_responses_streaming to send function responses back + # This is a continuation - use run_stream with responses to send function responses back logger.info(f"Continuing workflow to address {len(self.pending_requests)} requests") # Extract function responses from input messages, and ensure that @@ -212,7 +212,7 @@ async def _run_stream_impl( # NOTE: It is possible that some pending requests are not fulfilled, # and we will let the workflow to handle this -- the agent does not # have an opinion on this. - event_stream = self.workflow.send_responses_streaming(function_responses) + event_stream = self.workflow.run_stream(responses=function_responses) else: # Execute workflow with streaming (initial run or no function responses) # Pass the new input messages directly to the workflow diff --git a/python/packages/core/agent_framework/_workflows/_magentic.py b/python/packages/core/agent_framework/_workflows/_magentic.py index 8368b23845..91f56e837e 100644 --- a/python/packages/core/agent_framework/_workflows/_magentic.py +++ b/python/packages/core/agent_framework/_workflows/_magentic.py @@ -2458,17 +2458,6 @@ async def _validate_checkpoint_participants( f"Missing names: {missing}; unexpected names: {unexpected}." ) - async def run_stream_from_checkpoint( - self, - checkpoint_id: str, - checkpoint_storage: CheckpointStorage | None = None, - responses: dict[str, Any] | None = None, - ) -> AsyncIterable[WorkflowEvent]: - """Resume orchestration from a checkpoint and stream resulting events.""" - await self._validate_checkpoint_participants(checkpoint_id, checkpoint_storage) - async for event in self._workflow.run_stream_from_checkpoint(checkpoint_id, checkpoint_storage, responses): - yield event - async def run_with_string(self, task_text: str) -> WorkflowRunResult: """Run the workflow with a task string and return all events. @@ -2512,33 +2501,6 @@ async def run(self, message: Any | None = None) -> WorkflowRunResult: events.append(event) return WorkflowRunResult(events) - async def run_from_checkpoint( - self, - checkpoint_id: str, - checkpoint_storage: CheckpointStorage | None = None, - responses: dict[str, Any] | None = None, - ) -> WorkflowRunResult: - """Resume orchestration from a checkpoint and collect all resulting events.""" - events: list[WorkflowEvent] = [] - async for event in self.run_stream_from_checkpoint(checkpoint_id, checkpoint_storage, responses): - events.append(event) - return WorkflowRunResult(events) - - async def send_responses_streaming(self, responses: dict[str, Any]) -> AsyncIterable[WorkflowEvent]: - """Forward responses to pending requests and stream resulting events. - - This delegates to the underlying Workflow implementation. - """ - async for event in self._workflow.send_responses_streaming(responses): - yield event - - async def send_responses(self, responses: dict[str, Any]) -> WorkflowRunResult: - """Forward responses to pending requests and return all resulting events. - - This delegates to the underlying Workflow implementation. - """ - return await self._workflow.send_responses(responses) - def __getattr__(self, name: str) -> Any: """Delegate unknown attributes to the underlying workflow.""" return getattr(self._workflow, name) diff --git a/python/packages/core/agent_framework/_workflows/_runner_context.py b/python/packages/core/agent_framework/_workflows/_runner_context.py index d91e73a69a..b3892288ab 100644 --- a/python/packages/core/agent_framework/_workflows/_runner_context.py +++ b/python/packages/core/agent_framework/_workflows/_runner_context.py @@ -122,6 +122,18 @@ def has_checkpointing(self) -> bool: """ ... + def set_runtime_checkpoint_storage(self, storage: CheckpointStorage) -> None: + """Set runtime checkpoint storage to override build-time configuration. + + Args: + storage: The checkpoint storage to use for this run. + """ + ... + + def clear_runtime_checkpoint_storage(self) -> None: + """Clear runtime checkpoint storage override.""" + ... + # Checkpointing APIs (optional, enabled by storage) def set_workflow_id(self, workflow_id: str) -> None: """Set the workflow ID for the context.""" @@ -202,6 +214,7 @@ def __init__(self, checkpoint_storage: CheckpointStorage | None = None): # Checkpointing configuration/state self._checkpoint_storage = checkpoint_storage + self._runtime_checkpoint_storage: CheckpointStorage | None = None self._workflow_id: str | None = None # Streaming flag - set by workflow's run_stream() vs run() @@ -252,8 +265,24 @@ async def next_event(self) -> WorkflowEvent: # region Checkpointing + def _get_effective_checkpoint_storage(self) -> CheckpointStorage | None: + """Get the effective checkpoint storage (runtime override or build-time).""" + return self._runtime_checkpoint_storage or self._checkpoint_storage + + def set_runtime_checkpoint_storage(self, storage: CheckpointStorage) -> None: + """Set runtime checkpoint storage to override build-time configuration. + + Args: + storage: The checkpoint storage to use for this run. + """ + self._runtime_checkpoint_storage = storage + + def clear_runtime_checkpoint_storage(self) -> None: + """Clear runtime checkpoint storage override.""" + self._runtime_checkpoint_storage = None + def has_checkpointing(self) -> bool: - return self._checkpoint_storage is not None + return self._get_effective_checkpoint_storage() is not None async def create_checkpoint( self, @@ -261,7 +290,8 @@ async def create_checkpoint( iteration_count: int, metadata: dict[str, Any] | None = None, ) -> str: - if not self._checkpoint_storage: + storage = self._get_effective_checkpoint_storage() + if not storage: raise ValueError("Checkpoint storage not configured") self._workflow_id = self._workflow_id or str(uuid.uuid4()) @@ -274,14 +304,15 @@ async def create_checkpoint( iteration_count=state["iteration_count"], metadata=metadata or {}, ) - checkpoint_id = await self._checkpoint_storage.save_checkpoint(checkpoint) + checkpoint_id = await storage.save_checkpoint(checkpoint) logger.info(f"Created checkpoint {checkpoint_id} for workflow {self._workflow_id}") return checkpoint_id async def load_checkpoint(self, checkpoint_id: str) -> WorkflowCheckpoint | None: - if not self._checkpoint_storage: + storage = self._get_effective_checkpoint_storage() + if not storage: raise ValueError("Checkpoint storage not configured") - return await self._checkpoint_storage.load_checkpoint(checkpoint_id) + return await storage.load_checkpoint(checkpoint_id) def reset_for_new_run(self) -> None: """Reset the context for a new workflow run. diff --git a/python/packages/core/agent_framework/_workflows/_workflow.py b/python/packages/core/agent_framework/_workflows/_workflow.py index ed9352bb42..cb9aea22b7 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow.py +++ b/python/packages/core/agent_framework/_workflows/_workflow.py @@ -132,19 +132,33 @@ class Workflow(DictConvertible): Access these via the input_types and output_types properties. ## Execution Methods - - run(): Execute to completion, returns WorkflowRunResult with all events - - run_stream(): Returns async generator yielding events as they occur - - run_from_checkpoint(): Resume from a saved checkpoint - - run_stream_from_checkpoint(): Resume from checkpoint with streaming + The workflow provides two primary execution APIs, each supporting multiple scenarios: + + - **run()**: Execute to completion, returns WorkflowRunResult with all events + - **run_stream()**: Returns async generator yielding events as they occur + + Both methods support: + - Initial workflow runs: Provide `message` parameter + - Checkpoint restoration: Provide `checkpoint_id` (and optionally `checkpoint_storage`) + - HIL continuation: Provide `responses` to continue after RequestInfoExecutor requests + - Runtime checkpointing: Provide `checkpoint_storage` to enable/override checkpointing for this run ## External Input Requests Workflows can request external input using a RequestInfoExecutor: 1. Executor connects to RequestInfoExecutor via edge group and back to itself 2. Executor sends RequestInfoMessage to RequestInfoExecutor 3. RequestInfoExecutor emits RequestInfoEvent and workflow enters IDLE_WITH_PENDING_REQUESTS - 4. Caller handles requests and uses send_responses()/send_responses_streaming() to continue + 4. Caller handles requests and uses run()/run_stream() with responses parameter to continue ## Checkpointing + Checkpointing can be configured at build time or runtime: + + Build-time (via WorkflowBuilder): + workflow = WorkflowBuilder().with_checkpointing(storage).build() + + Runtime (via run/run_stream parameters): + result = await workflow.run(message, checkpoint_storage=runtime_storage) + When enabled, checkpoints are created at the end of each superstep, capturing: - Executor states - Messages in transit @@ -369,351 +383,402 @@ async def _run_workflow_with_tracing( capture_exception(span, exception=exc) raise - async def run_stream(self, message: Any) -> AsyncIterable[WorkflowEvent]: - """Run the workflow with a starting message and stream events. - - Args: - message: The message to be sent to the starting executor. - - Yields: - WorkflowEvent: The events generated during the workflow execution. - """ - self._ensure_not_running() - try: - - async def initial_execution() -> None: - executor = self.get_start_executor() - await executor.execute( - message, - [self.__class__.__name__], # source_executor_ids - self._shared_state, # shared_state - self._runner.context, # runner_context - trace_contexts=None, # No parent trace context for workflow start - source_span_ids=None, # No source span for workflow start - ) - - async for event in self._run_workflow_with_tracing( - initial_executor_fn=initial_execution, reset_context=True, streaming=True - ): - yield event - finally: - self._reset_running_flag() - - async def run_stream_from_checkpoint( + async def run_stream( self, - checkpoint_id: str, + message: Any | None = None, + *, + checkpoint_id: str | None = None, checkpoint_storage: CheckpointStorage | None = None, responses: dict[str, Any] | None = None, ) -> AsyncIterable[WorkflowEvent]: - """Resume workflow execution from a checkpoint and stream events. + """Run the workflow and stream events. + + Unified streaming interface supporting initial runs, checkpoint restoration, and HIL responses. Args: - checkpoint_id: The ID of the checkpoint to restore from. - checkpoint_storage: Optional checkpoint storage to use for restoration. - If not provided, the workflow must have been built with checkpointing enabled. - responses: Optional dictionary of responses to inject into the workflow - after restoration. Keys are request IDs, values are response data. + message: Initial message for the start executor. Required for new workflow runs, + should be None when resuming from checkpoint or sending HIL responses. + checkpoint_id: ID of checkpoint to restore from. If provided, the workflow resumes + from this checkpoint instead of starting fresh. When resuming, checkpoint_storage + must be provided (either at build time or runtime) to load the checkpoint. + checkpoint_storage: Runtime checkpoint storage with two behaviors: + - With checkpoint_id: Used to load and restore the specified checkpoint + - Without checkpoint_id: Enables checkpointing for this run, overriding + build-time configuration + responses: HIL responses to inject. Dictionary mapping request IDs (from RequestInfoEvent.request_id) + to response data. The request_id correlation ensures responses are matched to the correct + pending requests. Yields: - WorkflowEvent: Events generated during workflow execution. + WorkflowEvent: Events generated during workflow execution. RequestInfoEvent instances contain + request_id fields that must be used as keys in the responses dictionary. Raises: - ValueError: If neither checkpoint_storage is provided nor checkpointing is enabled. + ValueError: If both message and checkpoint_id are provided, or if neither is provided + when responses is also None. + ValueError: If checkpoint_id is provided but no checkpoint storage is available + (neither at build time nor runtime). RuntimeError: If checkpoint restoration fails. - """ - self._ensure_not_running() - try: - async def checkpoint_restoration() -> None: - has_checkpointing = self._runner.context.has_checkpointing() + Examples: + Initial run: - if not has_checkpointing and checkpoint_storage is None: - raise ValueError( - "Cannot restore from checkpoint: either provide checkpoint_storage parameter " - "or build workflow with WorkflowBuilder.with_checkpointing(checkpoint_storage)." - ) + .. code-block:: python - restored = await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage) + async for event in workflow.run_stream("start message"): + process(event) - if not restored: - raise RuntimeError(f"Failed to restore from checkpoint: {checkpoint_id}") + Enable checkpointing at runtime: - # Process any pending messages from the checkpoint first - # This ensures that RequestInfoExecutor state is properly populated - # before we try to handle responses - if await self._runner.context.has_messages(): - # Run one iteration to process pending messages - # This will populate RequestInfoExecutor._request_events properly - await self._runner._run_iteration() # type: ignore + .. code-block:: python - if responses: - request_info_executor = self._find_request_info_executor() - if request_info_executor: - for request_id, response_data in responses.items(): - ctx: WorkflowContext[Any] = WorkflowContext( - request_info_executor.id, - [self.__class__.__name__], - self._shared_state, - self._runner.context, - trace_contexts=None, # No parent trace context for new workflow span - source_span_ids=None, # No source span for response handling - ) + storage = FileCheckpointStorage("./checkpoints") + async for event in workflow.run_stream("start", checkpoint_storage=storage): + process(event) - if not await request_info_executor.has_pending_request(request_id, ctx): - logger.debug( - f"Skipping pre-supplied response for request {request_id}; " - f"no pending request found after checkpoint restoration." - ) - continue - - await request_info_executor.handle_response( - response_data, - request_id, - ctx, - ) + Resume from checkpoint (storage provided at build time): - async for event in self._run_workflow_with_tracing( - initial_executor_fn=checkpoint_restoration, - reset_context=False, # Don't reset context when resuming from checkpoint - streaming=True, - ): - yield event - finally: - self._reset_running_flag() + .. code-block:: python - async def send_responses_streaming(self, responses: dict[str, Any]) -> AsyncIterable[WorkflowEvent]: - """Send responses back to the workflow and stream the events generated by the workflow. + async for event in workflow.run_stream(checkpoint_id="cp_123"): + process(event) - Args: - responses: The responses to be sent back to the workflow, where keys are request IDs - and values are the corresponding response data. + Resume from checkpoint (storage provided at runtime): - Yields: - WorkflowEvent: The events generated during the workflow execution after sending the responses. + .. code-block:: python + + storage = FileCheckpointStorage("./checkpoints") + async for event in workflow.run_stream(checkpoint_id="cp_123", checkpoint_storage=storage): + process(event) + + Send HIL responses (request_id from RequestInfoEvent): + + .. code-block:: python + + # First, collect request ID from the event + async for event in workflow.run_stream("task"): + if isinstance(event, RequestInfoEvent): + request_id = event.request_id # Use this ID for correlation + break + # Then send response using the request_id + async for event in workflow.run_stream(responses={request_id: "approved"}): + process(event) + + Resume from checkpoint AND send HIL responses: + + .. code-block:: python + + async for event in workflow.run_stream(checkpoint_id="cp_123", responses={"req_1": "approved"}): + process(event) """ + # Validate mutually exclusive parameters BEFORE setting running flag + if message is not None and checkpoint_id is not None: + raise ValueError("Cannot provide both 'message' and 'checkpoint_id'. Use one or the other.") + + if message is None and checkpoint_id is None and responses is None: + raise ValueError( + "Must provide at least one of: 'message' (new run), 'checkpoint_id' (resume), " + "or 'responses' (HIL continuation)." + ) + + # Prevent invalid combination: both message and responses provided + if message is not None and responses is not None: + raise ValueError( + "Cannot provide both 'message' and 'responses'. Use 'message' for new runs, " + "'responses' for HIL continuation." + ) + self._ensure_not_running() + + # Enable runtime checkpointing if storage provided + # Two cases: + # 1. checkpoint_storage + checkpoint_id: Load checkpoint from this storage and resume + # 2. checkpoint_storage without checkpoint_id: Enable checkpointing for this run + if checkpoint_storage is not None: + self._runner.context.set_runtime_checkpoint_storage(checkpoint_storage) + try: - async def send_responses() -> None: - request_info_executor = self._find_request_info_executor() - if not request_info_executor: - raise ValueError("No RequestInfoExecutor found in workflow.") - - async def _handle_response(response: Any, request_id: str) -> None: - """Handle the response from the RequestInfoExecutor.""" - await request_info_executor.handle_response( - response, - request_id, - WorkflowContext( + async def execution_handler() -> None: + # Handle checkpoint restoration + if checkpoint_id is not None: + has_checkpointing = self._runner.context.has_checkpointing() + + if not has_checkpointing and checkpoint_storage is None: + raise ValueError( + "Cannot restore from checkpoint: either provide checkpoint_storage parameter " + "or build workflow with WorkflowBuilder.with_checkpointing(checkpoint_storage)." + ) + + restored = await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage) + + if not restored: + raise RuntimeError(f"Failed to restore from checkpoint: {checkpoint_id}") + + # Process pending messages from checkpoint + if await self._runner.context.has_messages(): + await self._runner._run_iteration() # type: ignore + + # Handle initial message + elif message is not None: + executor = self.get_start_executor() + await executor.execute( + message, + [self.__class__.__name__], + self._shared_state, + self._runner.context, + trace_contexts=None, + source_span_ids=None, + ) + + # Handle HIL responses + if responses: + request_info_executor = self._find_request_info_executor() + if not request_info_executor: + raise ValueError("No RequestInfoExecutor found in workflow.") + + async def _handle_response(response: Any, request_id: str) -> None: + ctx: WorkflowContext[Any] = WorkflowContext( request_info_executor.id, [self.__class__.__name__], self._shared_state, self._runner.context, - trace_contexts=None, # No parent trace context for new workflow span - source_span_ids=None, # No source span for response handling - ), - ) + trace_contexts=None, + source_span_ids=None, + ) + + if checkpoint_id and not await request_info_executor.has_pending_request(request_id, ctx): + logger.debug( + f"Skipping response for request {request_id}; " + f"no pending request found after checkpoint restoration." + ) + return + + await request_info_executor.handle_response(response, request_id, ctx) + + await asyncio.gather(*[ + _handle_response(response, request_id) for request_id, response in responses.items() + ]) - await asyncio.gather(*[ - _handle_response(response, request_id) for request_id, response in responses.items() - ]) + # Reset context only for new runs (not checkpoint restoration or HIL continuation) + reset_context = message is not None and checkpoint_id is None and responses is None async for event in self._run_workflow_with_tracing( - initial_executor_fn=send_responses, - reset_context=False, # Don't reset context when sending responses + initial_executor_fn=execution_handler, + reset_context=reset_context, streaming=True, ): yield event finally: + # Clear runtime checkpoint storage after run completes + if checkpoint_storage is not None: + self._runner.context.clear_runtime_checkpoint_storage() self._reset_running_flag() - async def run(self, message: Any, *, include_status_events: bool = False) -> WorkflowRunResult: - """Run the workflow with the given message. + async def run( + self, + message: Any | None = None, + *, + checkpoint_id: str | None = None, + checkpoint_storage: CheckpointStorage | None = None, + responses: dict[str, Any] | None = None, + include_status_events: bool = False, + ) -> WorkflowRunResult: + """Run the workflow to completion and return all events. + + Unified non-streaming interface supporting initial runs, checkpoint restoration, and HIL responses. Args: - message: The message to be processed by the workflow. + message: Initial message for the start executor. Required for new workflow runs, + should be None when resuming from checkpoint or sending HIL responses. + checkpoint_id: ID of checkpoint to restore from. If provided, the workflow resumes + from this checkpoint instead of starting fresh. When resuming, checkpoint_storage + must be provided (either at build time or runtime) to load the checkpoint. + checkpoint_storage: Runtime checkpoint storage with two behaviors: + - With checkpoint_id: Used to load and restore the specified checkpoint + - Without checkpoint_id: Enables checkpointing for this run, overriding + build-time configuration + responses: HIL responses to inject. Dictionary mapping request IDs (from RequestInfoEvent.request_id) + to response data. The request_id correlation ensures responses are matched to the correct + pending requests. include_status_events: Whether to include WorkflowStatusEvent instances in the result list. Returns: - A WorkflowRunResult instance containing a list of events generated during the workflow execution. - """ - self._ensure_not_running() - try: - - async def initial_execution() -> None: - executor = self.get_start_executor() - await executor.execute( - message, - [self.__class__.__name__], # source_executor_ids - self._shared_state, # shared_state - self._runner.context, # runner_context - trace_contexts=None, # No parent trace context for workflow start - source_span_ids=None, # No source span for workflow start - ) + A WorkflowRunResult instance containing events generated during workflow execution. - raw_events = [ - event - async for event in self._run_workflow_with_tracing( - initial_executor_fn=initial_execution, - reset_context=True, - ) - ] - finally: - self._reset_running_flag() + Raises: + ValueError: If both message and checkpoint_id are provided, or if neither is provided + when responses is also None. + ValueError: If checkpoint_id is provided but no checkpoint storage is available + (neither at build time nor runtime). + RuntimeError: If checkpoint restoration fails. - # Filter events for non-streaming mode - filtered: list[WorkflowEvent] = [] - status_events: list[WorkflowStatusEvent] = [] + Examples: + Initial run: - for ev in raw_events: - # Omit WorkflowStartedEvent from non-streaming (telemetry-only) - if isinstance(ev, WorkflowStartedEvent): - continue - # Track status; include inline only if explicitly requested - if isinstance(ev, WorkflowStatusEvent): - status_events.append(ev) - if include_status_events: - filtered.append(ev) - continue - filtered.append(ev) + .. code-block:: python - return WorkflowRunResult(filtered, status_events) + result = await workflow.run("start message") + outputs = result.get_outputs() - async def run_from_checkpoint( - self, - checkpoint_id: str, - checkpoint_storage: CheckpointStorage | None = None, - responses: dict[str, Any] | None = None, - ) -> WorkflowRunResult: - """Resume workflow execution from a checkpoint. + Enable checkpointing at runtime: - Args: - checkpoint_id: The ID of the checkpoint to restore from. - checkpoint_storage: Optional checkpoint storage to use for restoration. - If not provided, the workflow must have been built with checkpointing enabled. - responses: Optional dictionary of responses to inject into the workflow - after restoration. Keys are request IDs, values are response data. + .. code-block:: python - Returns: - A WorkflowRunResult instance containing a list of events generated during the workflow execution. + storage = FileCheckpointStorage("./checkpoints") + result = await workflow.run("start", checkpoint_storage=storage) - Raises: - ValueError: If neither checkpoint_storage is provided nor checkpointing is enabled. - RuntimeError: If checkpoint restoration fails. - """ - self._ensure_not_running() - try: + Resume from checkpoint (storage provided at build time): - async def checkpoint_restoration() -> None: - has_checkpointing = self._runner.context.has_checkpointing() + .. code-block:: python - if not has_checkpointing and checkpoint_storage is None: - raise ValueError( - "Cannot restore from checkpoint: either provide checkpoint_storage parameter " - "or build workflow with WorkflowBuilder.with_checkpointing(checkpoint_storage)." - ) + result = await workflow.run(checkpoint_id="cp_123") - restored = await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage) + Resume from checkpoint (storage provided at runtime): - if not restored: - raise RuntimeError(f"Failed to restore from checkpoint: {checkpoint_id}") + .. code-block:: python - # Process any pending messages from the checkpoint first - # This ensures that RequestInfoExecutor state is properly populated - # before we try to handle responses - if await self._runner.context.has_messages(): - # Run one iteration to process pending messages - # This will populate RequestInfoExecutor._request_events properly - await self._runner._run_iteration() # type: ignore + storage = FileCheckpointStorage("./checkpoints") + result = await workflow.run(checkpoint_id="cp_123", checkpoint_storage=storage) - if responses: - request_info_executor = self._find_request_info_executor() - if request_info_executor: - for request_id, response_data in responses.items(): - ctx: WorkflowContext[Any] = WorkflowContext( - request_info_executor.id, - [self.__class__.__name__], - self._shared_state, - self._runner.context, - trace_contexts=None, # No parent trace context for new workflow span - source_span_ids=None, # No source span for response handling - ) + Send HIL responses (request_id from RequestInfoEvent): - if not await request_info_executor.has_pending_request(request_id, ctx): - logger.debug( - f"Skipping pre-supplied response for request {request_id}; " - f"no pending request found after checkpoint restoration." - ) - continue - - await request_info_executor.handle_response( - response_data, - request_id, - ctx, - ) + .. code-block:: python - events = [ - event - async for event in self._run_workflow_with_tracing( - initial_executor_fn=checkpoint_restoration, - reset_context=False, # Don't reset context when resuming from checkpoint - ) - ] - status_events = [e for e in events if isinstance(e, WorkflowStatusEvent)] - filtered_events = [e for e in events if not isinstance(e, (WorkflowStatusEvent, WorkflowStartedEvent))] - return WorkflowRunResult(filtered_events, status_events) - finally: - self._reset_running_flag() + # First, get request ID from RequestInfoEvent + result = await workflow.run("task") + for event in result.events: + if isinstance(event, RequestInfoEvent): + request_id = event.request_id + # Then respond using the request_id + result = await workflow.run(responses={request_id: "approved"}) - async def send_responses(self, responses: dict[str, Any]) -> WorkflowRunResult: - """Send responses back to the workflow. + Resume and send HIL responses: - Args: - responses: A dictionary where keys are request IDs and values are the corresponding response data. + .. code-block:: python - Returns: - A WorkflowRunResult instance containing a list of events generated during the workflow execution. + result = await workflow.run(checkpoint_id="cp_123", responses={"req_1": "approved"}) """ + # Validate mutually exclusive parameters BEFORE setting running flag + if message is not None and checkpoint_id is not None: + raise ValueError("Cannot provide both 'message' and 'checkpoint_id'. Use one or the other.") + + if message is None and checkpoint_id is None and responses is None: + raise ValueError( + "Must provide at least one of: 'message' (new run), 'checkpoint_id' (resume), " + "or 'responses' (HIL continuation)." + ) + + # Prevent invalid combination: both message and responses provided + if message is not None and responses is not None: + raise ValueError( + "Cannot provide both 'message' and 'responses'. Use 'message' for new runs, " + "'responses' for HIL continuation." + ) + self._ensure_not_running() + + # Enable runtime checkpointing if storage provided + if checkpoint_storage is not None: + self._runner.context.set_runtime_checkpoint_storage(checkpoint_storage) + try: - async def send_responses_internal() -> None: - request_info_executor = self._find_request_info_executor() - if not request_info_executor: - raise ValueError("No RequestInfoExecutor found in workflow.") - - async def _handle_response(response: Any, request_id: str) -> None: - """Handle the response from the RequestInfoExecutor.""" - await request_info_executor.handle_response( - response, - request_id, - WorkflowContext( + async def execution_handler() -> None: + # Handle checkpoint restoration + if checkpoint_id is not None: + has_checkpointing = self._runner.context.has_checkpointing() + + if not has_checkpointing and checkpoint_storage is None: + raise ValueError( + "Cannot restore from checkpoint: either provide checkpoint_storage parameter " + "or build workflow with WorkflowBuilder.with_checkpointing(checkpoint_storage)." + ) + + restored = await self._runner.restore_from_checkpoint(checkpoint_id, checkpoint_storage) + + if not restored: + raise RuntimeError(f"Failed to restore from checkpoint: {checkpoint_id}") + + # Process pending messages from checkpoint + if await self._runner.context.has_messages(): + await self._runner._run_iteration() # type: ignore + + # Handle initial message + elif message is not None: + executor = self.get_start_executor() + await executor.execute( + message, + [self.__class__.__name__], + self._shared_state, + self._runner.context, + trace_contexts=None, + source_span_ids=None, + ) + + # Handle HIL responses + if responses: + request_info_executor = self._find_request_info_executor() + if not request_info_executor: + raise ValueError("No RequestInfoExecutor found in workflow.") + + async def _handle_response(response: Any, request_id: str) -> None: + ctx: WorkflowContext[Any] = WorkflowContext( request_info_executor.id, [self.__class__.__name__], self._shared_state, self._runner.context, - trace_contexts=None, # No parent trace context for new workflow span - source_span_ids=None, # No source span for response handling - ), - ) + trace_contexts=None, + source_span_ids=None, + ) + + if checkpoint_id and not await request_info_executor.has_pending_request(request_id, ctx): + logger.debug( + f"Skipping response for request {request_id}; " + f"no pending request found after checkpoint restoration." + ) + return + + await request_info_executor.handle_response(response, request_id, ctx) + + await asyncio.gather(*[ + _handle_response(response, request_id) for request_id, response in responses.items() + ]) - await asyncio.gather(*[ - _handle_response(response, request_id) for request_id, response in responses.items() - ]) + # Reset context only for new runs (not checkpoint restoration or HIL continuation) + reset_context = message is not None and checkpoint_id is None and responses is None - events = [ + raw_events = [ event async for event in self._run_workflow_with_tracing( - initial_executor_fn=send_responses_internal, - reset_context=False, # Don't reset context when sending responses + initial_executor_fn=execution_handler, + reset_context=reset_context, ) ] - status_events = [e for e in events if isinstance(e, WorkflowStatusEvent)] - filtered_events = [e for e in events if not isinstance(e, (WorkflowStatusEvent, WorkflowStartedEvent))] - return WorkflowRunResult(filtered_events, status_events) finally: + # Clear runtime checkpoint storage after run completes + if checkpoint_storage is not None: + self._runner.context.clear_runtime_checkpoint_storage() self._reset_running_flag() + # Filter events for non-streaming mode + filtered: list[WorkflowEvent] = [] + status_events: list[WorkflowStatusEvent] = [] + + for ev in raw_events: + # Omit WorkflowStartedEvent from non-streaming (telemetry-only) + if isinstance(ev, WorkflowStartedEvent): + continue + # Track status; include inline only if explicitly requested + if isinstance(ev, WorkflowStatusEvent): + status_events.append(ev) + if include_status_events: + filtered.append(ev) + continue + filtered.append(ev) + + return WorkflowRunResult(filtered, status_events) + def _get_executor_by_id(self, executor_id: str) -> Executor: """Get an executor by its ID. diff --git a/python/packages/core/agent_framework/_workflows/_workflow_executor.py b/python/packages/core/agent_framework/_workflows/_workflow_executor.py index 1d56eac4d8..f56c5de25a 100644 --- a/python/packages/core/agent_framework/_workflows/_workflow_executor.py +++ b/python/packages/core/agent_framework/_workflows/_workflow_executor.py @@ -485,7 +485,7 @@ async def handle_response( try: # Resume the sub-workflow with all collected responses - result = await self.workflow.send_responses(responses_to_send) + result = await self.workflow.run(responses=responses_to_send) # Process the workflow result using shared logic await self._process_workflow_result(result, execution_context, ctx) diff --git a/python/packages/core/tests/workflow/test_checkpoint_validation.py b/python/packages/core/tests/workflow/test_checkpoint_validation.py index 361758f3eb..9736660ed8 100644 --- a/python/packages/core/tests/workflow/test_checkpoint_validation.py +++ b/python/packages/core/tests/workflow/test_checkpoint_validation.py @@ -46,8 +46,8 @@ async def test_resume_fails_when_graph_mismatch() -> None: with pytest.raises(ValueError, match="Workflow graph has changed"): _ = [ event - async for event in mismatched_workflow.run_stream_from_checkpoint( - target_checkpoint.checkpoint_id, + async for event in mismatched_workflow.run_stream( + checkpoint_id=target_checkpoint.checkpoint_id, checkpoint_storage=storage, ) ] @@ -65,8 +65,8 @@ async def test_resume_succeeds_when_graph_matches() -> None: events = [ event - async for event in resumed_workflow.run_stream_from_checkpoint( - target_checkpoint.checkpoint_id, + async for event in resumed_workflow.run_stream( + checkpoint_id=target_checkpoint.checkpoint_id, checkpoint_storage=storage, ) ] diff --git a/python/packages/core/tests/workflow/test_concurrent.py b/python/packages/core/tests/workflow/test_concurrent.py index 3317685e5f..db70be3f38 100644 --- a/python/packages/core/tests/workflow/test_concurrent.py +++ b/python/packages/core/tests/workflow/test_concurrent.py @@ -195,7 +195,7 @@ async def test_concurrent_checkpoint_resume_round_trip() -> None: wf_resume = ConcurrentBuilder().participants(list(resumed_participants)).with_checkpointing(storage).build() resumed_output: list[ChatMessage] | None = None - async for ev in wf_resume.run_stream_from_checkpoint(resume_checkpoint.checkpoint_id): + async for ev in wf_resume.run_stream(checkpoint_id=resume_checkpoint.checkpoint_id): if isinstance(ev, WorkflowOutputEvent): resumed_output = ev.data # type: ignore[assignment] if isinstance(ev, WorkflowStatusEvent) and ev.state in ( @@ -207,3 +207,74 @@ async def test_concurrent_checkpoint_resume_round_trip() -> None: assert resumed_output is not None assert [m.role for m in resumed_output] == [m.role for m in baseline_output] assert [m.text for m in resumed_output] == [m.text for m in baseline_output] + + +async def test_concurrent_checkpoint_runtime_only() -> None: + """Test checkpointing configured ONLY at runtime, not at build time.""" + storage = InMemoryCheckpointStorage() + + agents = [_FakeAgentExec(id="agent1", reply_text="A1"), _FakeAgentExec(id="agent2", reply_text="A2")] + wf = ConcurrentBuilder().participants(agents).build() + + baseline_output: list[ChatMessage] | None = None + async for ev in wf.run_stream("runtime checkpoint test", checkpoint_storage=storage): + if isinstance(ev, WorkflowOutputEvent): + baseline_output = ev.data # type: ignore[assignment] + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + break + + assert baseline_output is not None + + checkpoints = await storage.list_checkpoints() + assert checkpoints + checkpoints.sort(key=lambda cp: cp.timestamp) + + resume_checkpoint = next( + (cp for cp in checkpoints if (cp.metadata or {}).get("checkpoint_type") == "superstep"), + checkpoints[-1], + ) + + resumed_agents = [_FakeAgentExec(id="agent1", reply_text="A1"), _FakeAgentExec(id="agent2", reply_text="A2")] + wf_resume = ConcurrentBuilder().participants(resumed_agents).build() + + resumed_output: list[ChatMessage] | None = None + async for ev in wf_resume.run_stream(checkpoint_id=resume_checkpoint.checkpoint_id, checkpoint_storage=storage): + if isinstance(ev, WorkflowOutputEvent): + resumed_output = ev.data # type: ignore[assignment] + if isinstance(ev, WorkflowStatusEvent) and ev.state in ( + WorkflowRunState.IDLE, + WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, + ): + break + + assert resumed_output is not None + assert [m.role for m in resumed_output] == [m.role for m in baseline_output] + + +async def test_concurrent_checkpoint_runtime_overrides_buildtime() -> None: + """Test that runtime checkpoint storage overrides build-time configuration.""" + import tempfile + + with tempfile.TemporaryDirectory() as temp_dir1, tempfile.TemporaryDirectory() as temp_dir2: + from agent_framework._workflows._checkpoint import FileCheckpointStorage + + buildtime_storage = FileCheckpointStorage(temp_dir1) + runtime_storage = FileCheckpointStorage(temp_dir2) + + agents = [_FakeAgentExec(id="agent1", reply_text="A1"), _FakeAgentExec(id="agent2", reply_text="A2")] + wf = ConcurrentBuilder().participants(agents).with_checkpointing(buildtime_storage).build() + + baseline_output: list[ChatMessage] | None = None + async for ev in wf.run_stream("override test", checkpoint_storage=runtime_storage): + if isinstance(ev, WorkflowOutputEvent): + baseline_output = ev.data # type: ignore[assignment] + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + break + + assert baseline_output is not None + + buildtime_checkpoints = await buildtime_storage.list_checkpoints() + runtime_checkpoints = await runtime_storage.list_checkpoints() + + assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints" + assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden" diff --git a/python/packages/core/tests/workflow/test_group_chat.py b/python/packages/core/tests/workflow/test_group_chat.py index 01942a8703..ab920d0663 100644 --- a/python/packages/core/tests/workflow/test_group_chat.py +++ b/python/packages/core/tests/workflow/test_group_chat.py @@ -742,3 +742,73 @@ def selector(state: GroupChatStateSnapshot) -> str | None: # The last message should be about round limit final_output = outputs[-1] assert "round limit" in final_output.text.lower() + + +async def test_group_chat_checkpoint_runtime_only() -> None: + """Test checkpointing configured ONLY at runtime, not at build time.""" + from agent_framework import WorkflowRunState, WorkflowStatusEvent + + storage = InMemoryCheckpointStorage() + + agent_a = StubAgent("agentA", "Reply from A") + agent_b = StubAgent("agentB", "Reply from B") + selector = make_sequence_selector() + + wf = GroupChatBuilder().participants([agent_a, agent_b]).select_speakers(selector).build() + + baseline_output: list[ChatMessage] | None = None + async for ev in wf.run_stream("runtime checkpoint test", checkpoint_storage=storage): + if isinstance(ev, WorkflowOutputEvent): + baseline_output = ev.data # type: ignore[assignment] + if isinstance(ev, WorkflowStatusEvent) and ev.state in ( + WorkflowRunState.IDLE, + WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, + ): + break + + assert baseline_output is not None + + checkpoints = await storage.list_checkpoints() + assert len(checkpoints) > 0, "Runtime-only checkpointing should have created checkpoints" + + +async def test_group_chat_checkpoint_runtime_overrides_buildtime() -> None: + """Test that runtime checkpoint storage overrides build-time configuration.""" + import tempfile + + with tempfile.TemporaryDirectory() as temp_dir1, tempfile.TemporaryDirectory() as temp_dir2: + from agent_framework import WorkflowRunState, WorkflowStatusEvent + from agent_framework._workflows._checkpoint import FileCheckpointStorage + + buildtime_storage = FileCheckpointStorage(temp_dir1) + runtime_storage = FileCheckpointStorage(temp_dir2) + + agent_a = StubAgent("agentA", "Reply from A") + agent_b = StubAgent("agentB", "Reply from B") + selector = make_sequence_selector() + + wf = ( + GroupChatBuilder() + .participants([agent_a, agent_b]) + .select_speakers(selector) + .with_checkpointing(buildtime_storage) + .build() + ) + + baseline_output: list[ChatMessage] | None = None + async for ev in wf.run_stream("override test", checkpoint_storage=runtime_storage): + if isinstance(ev, WorkflowOutputEvent): + baseline_output = ev.data # type: ignore[assignment] + if isinstance(ev, WorkflowStatusEvent) and ev.state in ( + WorkflowRunState.IDLE, + WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, + ): + break + + assert baseline_output is not None + + buildtime_checkpoints = await buildtime_storage.list_checkpoints() + runtime_checkpoints = await runtime_storage.list_checkpoints() + + assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints" + assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden" diff --git a/python/packages/core/tests/workflow/test_handoff.py b/python/packages/core/tests/workflow/test_handoff.py index 12d115ad40..0d6c3fd9dd 100644 --- a/python/packages/core/tests/workflow/test_handoff.py +++ b/python/packages/core/tests/workflow/test_handoff.py @@ -176,7 +176,7 @@ async def test_handoff_routes_to_specialist_and_requests_user_input(): assert request_payload.conversation[3].role == Role.ASSISTANT assert "specialist reply" in request_payload.conversation[3].text - follow_up = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "Thanks"})) + follow_up = await _drain(workflow.run_stream(responses={requests[-1].request_id: "Thanks"})) assert any(isinstance(ev, RequestInfoEvent) for ev in follow_up) @@ -204,7 +204,7 @@ async def test_specialist_to_specialist_handoff(): assert len(specialist.calls) > 0 # Second user message - specialist hands off to escalation - events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "This is complex"})) + events = await _drain(workflow.run_stream(responses={requests[-1].request_id: "This is complex"})) outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] assert outputs @@ -256,9 +256,7 @@ async def test_handoff_preserves_complex_additional_properties(complex_metadata: assert restored_meta.payload["code"] == "X1" # Respond and ensure metadata survives subsequent cycles - follow_up_events = await _drain( - workflow.send_responses_streaming({requests[-1].request_id: "Here are more details"}) - ) + follow_up_events = await _drain(workflow.run_stream(responses={requests[-1].request_id: "Here are more details"})) follow_up_requests = [ev for ev in follow_up_events if isinstance(ev, RequestInfoEvent)] outputs = [ev for ev in follow_up_events if isinstance(ev, WorkflowOutputEvent)] @@ -325,7 +323,7 @@ async def test_multiple_runs_dont_leak_conversation(): events = await _drain(workflow.run_stream("First run message")) requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] assert requests - events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "Second message"})) + events = await _drain(workflow.run_stream(responses={requests[-1].request_id: "Second message"})) outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] assert outputs, "First run should emit output" @@ -343,7 +341,7 @@ async def test_multiple_runs_dont_leak_conversation(): events = await _drain(workflow.run_stream("Second run different message")) requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] assert requests - events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "Another message"})) + events = await _drain(workflow.run_stream(responses={requests[-1].request_id: "Another message"})) outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] assert outputs, "Second run should emit output" @@ -383,7 +381,7 @@ async def async_termination(conv: list[ChatMessage]) -> bool: requests = [ev for ev in events if isinstance(ev, RequestInfoEvent)] assert requests - events = await _drain(workflow.send_responses_streaming({requests[-1].request_id: "Second user message"})) + events = await _drain(workflow.run_stream(responses={requests[-1].request_id: "Second user message"})) outputs = [ev for ev in events if isinstance(ev, WorkflowOutputEvent)] assert len(outputs) == 1 diff --git a/python/packages/core/tests/workflow/test_magentic.py b/python/packages/core/tests/workflow/test_magentic.py index fe83fa0ea4..b6a26c711e 100644 --- a/python/packages/core/tests/workflow/test_magentic.py +++ b/python/packages/core/tests/workflow/test_magentic.py @@ -203,9 +203,9 @@ async def test_magentic_workflow_plan_review_approval_to_completion(): completed = False output: ChatMessage | None = None - async for ev in wf.send_responses_streaming({ - req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) - }): + async for ev in wf.run_stream( + responses={req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)} + ): if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: completed = True elif isinstance(ev, WorkflowOutputEvent): @@ -248,12 +248,14 @@ async def replan(self, magentic_context: MagenticContext) -> ChatMessage: # typ # Reply APPROVE with comments (no edited text). Expect one replan and no second review round. saw_second_review = False completed = False - async for ev in wf.send_responses_streaming({ - req_event.request_id: MagenticPlanReviewReply( - decision=MagenticPlanReviewDecision.APPROVE, - comments="Looks good; consider Z", - ) - }): + async for ev in wf.run_stream( + responses={ + req_event.request_id: MagenticPlanReviewReply( + decision=MagenticPlanReviewDecision.APPROVE, + comments="Looks good; consider Z", + ) + } + ): if isinstance(ev, RequestInfoEvent) and ev.request_type is MagenticPlanReviewRequest: saw_second_review = True if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: @@ -334,8 +336,8 @@ async def test_magentic_checkpoint_resume_round_trip(): reply = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) completed: WorkflowOutputEvent | None = None - async for event in wf_resume.run_stream_from_checkpoint( - resume_checkpoint.checkpoint_id, + async for event in wf_resume.run_stream( + checkpoint_id=resume_checkpoint.checkpoint_id, responses={req_event.request_id: reply}, ): if isinstance(event, WorkflowOutputEvent): @@ -599,7 +601,7 @@ async def test_magentic_checkpoint_resume_inner_loop_superstep(): ) completed: WorkflowOutputEvent | None = None - async for event in resumed.run_stream_from_checkpoint(inner_loop_checkpoint.checkpoint_id): # type: ignore[reportUnknownMemberType] + async for event in resumed.run_stream(checkpoint_id=inner_loop_checkpoint.checkpoint_id): # type: ignore[reportUnknownMemberType] if isinstance(event, WorkflowOutputEvent): completed = event @@ -641,7 +643,7 @@ async def test_magentic_checkpoint_resume_after_reset(): ) completed: WorkflowOutputEvent | None = None - async for event in resumed_workflow.run_stream_from_checkpoint(resumed_state.checkpoint_id): + async for event in resumed_workflow.run_stream(checkpoint_id=resumed_state.checkpoint_id): if isinstance(event, WorkflowOutputEvent): completed = event @@ -683,8 +685,8 @@ async def test_magentic_checkpoint_resume_rejects_participant_renames(): ) with pytest.raises(ValueError, match="Workflow graph has changed"): - async for _ in renamed_workflow.run_stream_from_checkpoint( - target_checkpoint.checkpoint_id, # type: ignore[reportUnknownMemberType] + async for _ in renamed_workflow.run_stream( + checkpoint_id=target_checkpoint.checkpoint_id, # type: ignore[reportUnknownMemberType] responses={req_event.request_id: MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE)}, ): pass @@ -732,3 +734,66 @@ async def test_magentic_stall_and_reset_successfully(): assert isinstance(output_event.data, ChatMessage) assert output_event.data.text is not None assert output_event.data.text == "re-ledger" + + +async def test_magentic_checkpoint_runtime_only() -> None: + """Test checkpointing configured ONLY at runtime, not at build time.""" + storage = InMemoryCheckpointStorage() + + manager = FakeManager(max_round_count=10) + manager.satisfied_after_signoff = True + wf = MagenticBuilder().participants(agentA=_DummyExec("agentA")).with_standard_manager(manager).build() + + baseline_output: ChatMessage | None = None + async for ev in wf.run_stream("runtime checkpoint test", checkpoint_storage=storage): + if isinstance(ev, WorkflowOutputEvent): + baseline_output = ev.data # type: ignore[assignment] + if isinstance(ev, WorkflowStatusEvent) and ev.state in ( + WorkflowRunState.IDLE, + WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, + ): + break + + assert baseline_output is not None + + checkpoints = await storage.list_checkpoints() + assert len(checkpoints) > 0, "Runtime-only checkpointing should have created checkpoints" + + +async def test_magentic_checkpoint_runtime_overrides_buildtime() -> None: + """Test that runtime checkpoint storage overrides build-time configuration.""" + import tempfile + + with tempfile.TemporaryDirectory() as temp_dir1, tempfile.TemporaryDirectory() as temp_dir2: + from agent_framework._workflows._checkpoint import FileCheckpointStorage + + buildtime_storage = FileCheckpointStorage(temp_dir1) + runtime_storage = FileCheckpointStorage(temp_dir2) + + manager = FakeManager(max_round_count=10) + manager.satisfied_after_signoff = True + wf = ( + MagenticBuilder() + .participants(agentA=_DummyExec("agentA")) + .with_standard_manager(manager) + .with_checkpointing(buildtime_storage) + .build() + ) + + baseline_output: ChatMessage | None = None + async for ev in wf.run_stream("override test", checkpoint_storage=runtime_storage): + if isinstance(ev, WorkflowOutputEvent): + baseline_output = ev.data # type: ignore[assignment] + if isinstance(ev, WorkflowStatusEvent) and ev.state in ( + WorkflowRunState.IDLE, + WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, + ): + break + + assert baseline_output is not None + + buildtime_checkpoints = await buildtime_storage.list_checkpoints() + runtime_checkpoints = await runtime_storage.list_checkpoints() + + assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints" + assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden" diff --git a/python/packages/core/tests/workflow/test_sequential.py b/python/packages/core/tests/workflow/test_sequential.py index 54df5b1638..165d764725 100644 --- a/python/packages/core/tests/workflow/test_sequential.py +++ b/python/packages/core/tests/workflow/test_sequential.py @@ -145,7 +145,7 @@ async def test_sequential_checkpoint_resume_round_trip() -> None: wf_resume = SequentialBuilder().participants(list(resumed_agents)).with_checkpointing(storage).build() resumed_output: list[ChatMessage] | None = None - async for ev in wf_resume.run_stream_from_checkpoint(resume_checkpoint.checkpoint_id): + async for ev in wf_resume.run_stream(checkpoint_id=resume_checkpoint.checkpoint_id): if isinstance(ev, WorkflowOutputEvent): resumed_output = ev.data # type: ignore[assignment] if isinstance(ev, WorkflowStatusEvent) and ev.state in ( @@ -157,3 +157,75 @@ async def test_sequential_checkpoint_resume_round_trip() -> None: assert resumed_output is not None assert [m.role for m in resumed_output] == [m.role for m in baseline_output] assert [m.text for m in resumed_output] == [m.text for m in baseline_output] + + +async def test_sequential_checkpoint_runtime_only() -> None: + """Test checkpointing configured ONLY at runtime, not at build time.""" + storage = InMemoryCheckpointStorage() + + agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) + wf = SequentialBuilder().participants(list(agents)).build() + + baseline_output: list[ChatMessage] | None = None + async for ev in wf.run_stream("runtime checkpoint test", checkpoint_storage=storage): + if isinstance(ev, WorkflowOutputEvent): + baseline_output = ev.data # type: ignore[assignment] + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + break + + assert baseline_output is not None + + checkpoints = await storage.list_checkpoints() + assert checkpoints + checkpoints.sort(key=lambda cp: cp.timestamp) + + resume_checkpoint = next( + (cp for cp in checkpoints if (cp.metadata or {}).get("checkpoint_type") == "superstep"), + checkpoints[-1], + ) + + resumed_agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) + wf_resume = SequentialBuilder().participants(list(resumed_agents)).build() + + resumed_output: list[ChatMessage] | None = None + async for ev in wf_resume.run_stream(checkpoint_id=resume_checkpoint.checkpoint_id, checkpoint_storage=storage): + if isinstance(ev, WorkflowOutputEvent): + resumed_output = ev.data # type: ignore[assignment] + if isinstance(ev, WorkflowStatusEvent) and ev.state in ( + WorkflowRunState.IDLE, + WorkflowRunState.IDLE_WITH_PENDING_REQUESTS, + ): + break + + assert resumed_output is not None + assert [m.role for m in resumed_output] == [m.role for m in baseline_output] + assert [m.text for m in resumed_output] == [m.text for m in baseline_output] + + +async def test_sequential_checkpoint_runtime_overrides_buildtime() -> None: + """Test that runtime checkpoint storage overrides build-time configuration.""" + import tempfile + + with tempfile.TemporaryDirectory() as temp_dir1, tempfile.TemporaryDirectory() as temp_dir2: + from agent_framework._workflows._checkpoint import FileCheckpointStorage + + buildtime_storage = FileCheckpointStorage(temp_dir1) + runtime_storage = FileCheckpointStorage(temp_dir2) + + agents = (_EchoAgent(id="agent1", name="A1"), _EchoAgent(id="agent2", name="A2")) + wf = SequentialBuilder().participants(list(agents)).with_checkpointing(buildtime_storage).build() + + baseline_output: list[ChatMessage] | None = None + async for ev in wf.run_stream("override test", checkpoint_storage=runtime_storage): + if isinstance(ev, WorkflowOutputEvent): + baseline_output = ev.data # type: ignore[assignment] + if isinstance(ev, WorkflowStatusEvent) and ev.state == WorkflowRunState.IDLE: + break + + assert baseline_output is not None + + buildtime_checkpoints = await buildtime_storage.list_checkpoints() + runtime_checkpoints = await runtime_storage.list_checkpoints() + + assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints" + assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden" diff --git a/python/packages/core/tests/workflow/test_sub_workflow.py b/python/packages/core/tests/workflow/test_sub_workflow.py index 2c787fe658..701706fb05 100644 --- a/python/packages/core/tests/workflow/test_sub_workflow.py +++ b/python/packages/core/tests/workflow/test_sub_workflow.py @@ -204,9 +204,11 @@ async def test_basic_sub_workflow() -> None: assert request_events[0].data.domain == "example.com" # Send response through the main workflow - await main_workflow.send_responses({ - request_events[0].request_id: True # Domain is approved - }) + await main_workflow.run( + responses={ + request_events[0].request_id: True # Domain is approved + } + ) # Check result assert parent.result is not None @@ -251,9 +253,11 @@ async def test_sub_workflow_with_interception(): assert request_events[0].data.domain == "unknown.com" # Send external response - await main_workflow.send_responses({ - request_events[0].request_id: False # Domain not approved - }) + await main_workflow.run( + responses={ + request_events[0].request_id: False # Domain not approved + } + ) assert parent.result is not None assert parent.result.email == "user@unknown.com" assert parent.result.is_valid is False @@ -413,7 +417,7 @@ async def collect_result(self, result: ValidationResult, ctx: WorkflowContext) - # Send responses for all requests (approve all domains) responses = {event.request_id: True for event in request_events} - await main_workflow.send_responses(responses) + await main_workflow.run(responses=responses) # All results should be collected assert len(processor.results) == len(emails) diff --git a/python/packages/core/tests/workflow/test_workflow.py b/python/packages/core/tests/workflow/test_workflow.py index f66c7048e8..97052edf1b 100644 --- a/python/packages/core/tests/workflow/test_workflow.py +++ b/python/packages/core/tests/workflow/test_workflow.py @@ -202,9 +202,7 @@ async def test_workflow_send_responses_streaming(): assert request_info_event is not None result: int | None = None completed = False - async for event in workflow.send_responses_streaming({ - request_info_event.request_id: ApprovalMessage(approved=True) - }): + async for event in workflow.run_stream(responses={request_info_event.request_id: ApprovalMessage(approved=True)}): if isinstance(event, WorkflowOutputEvent): result = event.data elif isinstance(event, WorkflowStatusEvent) and event.state == WorkflowRunState.IDLE: @@ -236,7 +234,7 @@ async def test_workflow_send_responses(): assert len(request_info_events) == 1 - result = await workflow.send_responses({request_info_events[0].request_id: ApprovalMessage(approved=True)}) + result = await workflow.run(responses={request_info_events[0].request_id: ApprovalMessage(approved=True)}) assert result.get_final_state() == WorkflowRunState.IDLE outputs = result.get_outputs() @@ -353,7 +351,7 @@ async def test_workflow_checkpointing_not_enabled_for_external_restore(simple_ex # Attempt to restore from checkpoint without providing external storage should fail try: - [event async for event in workflow.run_stream_from_checkpoint("fake-checkpoint-id")] + [event async for event in workflow.run_stream(checkpoint_id="fake-checkpoint-id")] raise AssertionError("Expected ValueError to be raised") except ValueError as e: assert "Cannot restore from checkpoint" in str(e) @@ -371,7 +369,7 @@ async def test_workflow_run_stream_from_checkpoint_no_checkpointing_enabled(simp # Attempt to run from checkpoint should fail try: - async for _ in workflow.run_stream_from_checkpoint("fake_checkpoint_id"): + async for _ in workflow.run_stream(checkpoint_id="fake_checkpoint_id"): pass raise AssertionError("Expected ValueError to be raised") except ValueError as e: @@ -395,7 +393,7 @@ async def test_workflow_run_stream_from_checkpoint_invalid_checkpoint(simple_exe # Attempt to run from non-existent checkpoint should fail try: - async for _ in workflow.run_stream_from_checkpoint("nonexistent_checkpoint_id"): + async for _ in workflow.run_stream(checkpoint_id="nonexistent_checkpoint_id"): pass raise AssertionError("Expected RuntimeError to be raised") except RuntimeError as e: @@ -426,8 +424,8 @@ async def test_workflow_run_stream_from_checkpoint_with_external_storage(simple_ # Resume from checkpoint using external storage parameter try: events: list[WorkflowEvent] = [] - async for event in workflow_without_checkpointing.run_stream_from_checkpoint( - checkpoint_id, checkpoint_storage=storage + async for event in workflow_without_checkpointing.run_stream( + checkpoint_id=checkpoint_id, checkpoint_storage=storage ): events.append(event) if len(events) >= 2: # Limit to avoid infinite loops @@ -462,8 +460,8 @@ async def test_workflow_run_from_checkpoint_non_streaming(simple_executor: Execu .build() ) - # Test non-streaming run_from_checkpoint method - result = await workflow.run_from_checkpoint(checkpoint_id) + # Test non-streaming run method with checkpoint_id + result = await workflow.run(checkpoint_id=checkpoint_id) assert isinstance(result, list) # Should return WorkflowRunResult which extends list assert hasattr(result, "get_outputs") # Should have WorkflowRunResult methods @@ -493,12 +491,12 @@ async def test_workflow_run_stream_from_checkpoint_with_responses(simple_executo .build() ) - # Test that run_stream_from_checkpoint accepts responses parameter + # Test that run_stream accepts checkpoint_id and responses parameters responses = {"request_123": {"data": "test_response"}} try: events: list[WorkflowEvent] = [] - async for event in workflow.run_stream_from_checkpoint(checkpoint_id, responses=responses): + async for event in workflow.run_stream(checkpoint_id=checkpoint_id, responses=responses): events.append(event) if len(events) >= 2: # Limit to avoid infinite loops break @@ -580,6 +578,74 @@ async def test_workflow_multiple_runs_no_state_collision(): assert outputs1[0] != outputs3[0] +async def test_workflow_checkpoint_runtime_only_configuration(simple_executor: Executor): + """Test that checkpointing can be configured ONLY at runtime, not at build time.""" + with tempfile.TemporaryDirectory() as temp_dir: + storage = FileCheckpointStorage(temp_dir) + + # Build workflow WITHOUT checkpointing at build time + workflow = ( + WorkflowBuilder().add_edge(simple_executor, simple_executor).set_start_executor(simple_executor).build() + ) + + # Run with runtime checkpoint storage - should create checkpoints + test_message = Message(data="runtime checkpoint test", source_id="test", target_id=None) + result = await workflow.run(test_message, checkpoint_storage=storage) + assert result is not None + assert result.get_final_state() == WorkflowRunState.IDLE + + # Verify checkpoints were created + checkpoints = await storage.list_checkpoints() + assert len(checkpoints) > 0 + + # Find a superstep checkpoint to resume from + checkpoints.sort(key=lambda cp: cp.timestamp) + resume_checkpoint = next( + (cp for cp in checkpoints if (cp.metadata or {}).get("checkpoint_type") == "superstep"), + checkpoints[-1], + ) + + # Create new workflow instance (still without build-time checkpointing) + workflow_resume = ( + WorkflowBuilder().add_edge(simple_executor, simple_executor).set_start_executor(simple_executor).build() + ) + + # Resume from checkpoint using runtime checkpoint storage + result_resumed = await workflow_resume.run( + checkpoint_id=resume_checkpoint.checkpoint_id, checkpoint_storage=storage + ) + assert result_resumed is not None + assert result_resumed.get_final_state() in (WorkflowRunState.IDLE, WorkflowRunState.IDLE_WITH_PENDING_REQUESTS) + + +async def test_workflow_checkpoint_runtime_overrides_buildtime(simple_executor: Executor): + """Test that runtime checkpoint storage overrides build-time configuration.""" + with tempfile.TemporaryDirectory() as temp_dir1, tempfile.TemporaryDirectory() as temp_dir2: + buildtime_storage = FileCheckpointStorage(temp_dir1) + runtime_storage = FileCheckpointStorage(temp_dir2) + + # Build workflow with build-time checkpointing + workflow = ( + WorkflowBuilder() + .add_edge(simple_executor, simple_executor) + .set_start_executor(simple_executor) + .with_checkpointing(buildtime_storage) + .build() + ) + + # Run with runtime checkpoint storage override + test_message = Message(data="override test", source_id="test", target_id=None) + result = await workflow.run(test_message, checkpoint_storage=runtime_storage) + assert result is not None + + # Verify checkpoints were created in runtime storage, not build-time storage + buildtime_checkpoints = await buildtime_storage.list_checkpoints() + runtime_checkpoints = await runtime_storage.list_checkpoints() + + assert len(runtime_checkpoints) > 0, "Runtime storage should have checkpoints" + assert len(buildtime_checkpoints) == 0, "Build-time storage should have no checkpoints when overridden" + + async def test_comprehensive_edge_groups_workflow(): """Test a workflow that uses SwitchCaseEdgeGroup, FanOutEdgeGroup, and FanInEdgeGroup.""" from agent_framework import Case, Default @@ -786,7 +852,7 @@ async def consume_stream(): break with pytest.raises(RuntimeError, match="Workflow is already running. Concurrent executions are not allowed."): - await workflow.send_responses({"test": "data"}) + await workflow.run(responses={"test": "data"}) # Wait for the original task to complete await task1 @@ -867,3 +933,92 @@ async def test_agent_streaming_vs_non_streaming() -> None: e.data.contents[0].text for e in stream_agent_update_events if e.data.contents and e.data.contents[0].text ) assert accumulated_text == "Hello World", f"Expected 'Hello World', got '{accumulated_text}'" + + +async def test_workflow_run_parameter_validation(simple_executor: Executor) -> None: + """Test that run() and run_stream() properly validate parameter combinations.""" + workflow = WorkflowBuilder().add_edge(simple_executor, simple_executor).set_start_executor(simple_executor).build() + + test_message = Message(data="test", source_id="test", target_id=None) + + # Valid: message only (new run) + result = await workflow.run(test_message) + assert result.get_final_state() == WorkflowRunState.IDLE + + # Valid: responses only (HIL continuation) - requires RequestInfoExecutor + # This test validates that the parameter is accepted, not full HIL flow + # Actual HIL flow is tested in dedicated HIL tests + + # Invalid: both message and checkpoint_id + with pytest.raises(ValueError, match="Cannot provide both 'message' and 'checkpoint_id'"): + await workflow.run(test_message, checkpoint_id="fake_id") + + # Invalid: both message and checkpoint_id (streaming) + with pytest.raises(ValueError, match="Cannot provide both 'message' and 'checkpoint_id'"): + async for _ in workflow.run_stream(test_message, checkpoint_id="fake_id"): + pass + + # Invalid: both message and responses + with pytest.raises(ValueError, match="Cannot provide both 'message' and 'responses'"): + await workflow.run(test_message, responses={"req_id": "response"}) + + # Invalid: both message and responses (streaming) + with pytest.raises(ValueError, match="Cannot provide both 'message' and 'responses'"): + async for _ in workflow.run_stream(test_message, responses={"req_id": "response"}): + pass + + # Invalid: none of message, checkpoint_id, or responses + with pytest.raises(ValueError, match="Must provide at least one of"): + await workflow.run() + + # Invalid: none of message, checkpoint_id, or responses (streaming) + with pytest.raises(ValueError, match="Must provide at least one of"): + async for _ in workflow.run_stream(): + pass + + +async def test_workflow_run_stream_parameter_validation(simple_executor: Executor) -> None: + """Test run_stream() specific parameter validation scenarios.""" + workflow = WorkflowBuilder().add_edge(simple_executor, simple_executor).set_start_executor(simple_executor).build() + + test_message = Message(data="test", source_id="test", target_id=None) + + # Valid: message only (new run) + events = [] + async for event in workflow.run_stream(test_message): + events.append(event) + assert any(isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE for e in events) + + # Invalid combinations already tested in test_workflow_run_parameter_validation + # This test ensures streaming works correctly for valid parameters + + +async def test_workflow_checkpoint_and_responses_combination(simple_executor: Executor) -> None: + """Test that checkpoint_id + responses is a valid combination (checkpoint resume with HIL).""" + with tempfile.TemporaryDirectory() as temp_dir: + from agent_framework import RequestInfoExecutor + + storage = FileCheckpointStorage(temp_dir) + + # Build workflow with RequestInfoExecutor for HIL testing + request_info = RequestInfoExecutor(id="request_info") + workflow = ( + WorkflowBuilder() + .set_start_executor(simple_executor) + .add_edge(simple_executor, request_info) + .add_edge(request_info, simple_executor) + .with_checkpointing(storage) + .build() + ) + + # This validates that checkpoint_id + responses is allowed (for checkpoint resume + HIL) + # The actual execution may fail due to invalid checkpoint_id, but the parameter + # validation should pass + try: + async for _ in workflow.run_stream(checkpoint_id="fake_checkpoint", responses={"req_id": "response"}): + pass + except (ValueError, RuntimeError) as e: + # We expect a checkpoint restoration error, not a parameter validation error + assert "Cannot provide both" not in str(e), "Parameter validation should allow checkpoint_id + responses" + # Expected errors: "Cannot restore from checkpoint" or "Failed to restore" + assert "checkpoint" in str(e).lower() or "restore" in str(e).lower() diff --git a/python/samples/getting_started/workflows/_start-here/step1_executors_and_edges.py b/python/samples/getting_started/workflows/_start-here/step1_executors_and_edges.py index 9f67f6f7d8..b5c80062dd 100644 --- a/python/samples/getting_started/workflows/_start-here/step1_executors_and_edges.py +++ b/python/samples/getting_started/workflows/_start-here/step1_executors_and_edges.py @@ -117,14 +117,14 @@ async def main(): # retrieves the outputs yielded by any terminal nodes. events = await workflow.run("hello world") print(events.get_outputs()) - # Summarize the final run state (e.g., COMPLETED) + # Summarize the final run state (e.g., IDLE) print("Final state:", events.get_final_state()) """ Sample Output: ['DLROW OLLEH'] - Final state: WorkflowRunState.COMPLETED + Final state: WorkflowRunState.IDLE """ diff --git a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py index aa8efa9cc1..aa9a1d9f40 100644 --- a/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py +++ b/python/samples/getting_started/workflows/agents/azure_chat_agents_tool_calls_with_feedback.py @@ -188,7 +188,7 @@ async def main() -> None: while not completed: last_executor: str | None = None stream = ( - workflow.send_responses_streaming(pending_responses) + workflow.run_stream(responses=pending_responses) if pending_responses is not None else workflow.run_stream( "Create a short launch blurb for the LumenX desk lamp. Emphasize adjustability and warm lighting." diff --git a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py index 76a3de5f01..4f877a3ab9 100644 --- a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py +++ b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_human_in_the_loop.py @@ -55,7 +55,7 @@ - Minimal executor pipeline with checkpoint persistence. - Human-in-the-loop pause/resume by pairing `RequestInfoExecutor` with checkpoint restoration. -- Supplying responses at restore time (`run_stream_from_checkpoint(..., responses=...)`). +- Supplying responses at restore time (`run_stream(checkpoint_id=..., responses=...)`). Typical pause/resume flow ------------------------- @@ -364,7 +364,7 @@ async def run_interactive_session(workflow: "Workflow", initial_message: str) -> first = False elif pending_responses: # Feed any answers the user just typed back into the workflow. - events = await _consume(workflow.send_responses_streaming(pending_responses)) + events = await _consume(workflow.run_stream(responses=pending_responses)) else: break @@ -385,8 +385,8 @@ async def resume_from_checkpoint( print(f"\nResuming from checkpoint: {checkpoint_id}") events = await _consume( - workflow.run_stream_from_checkpoint( - checkpoint_id, + workflow.run_stream( + checkpoint_id=checkpoint_id, checkpoint_storage=storage, responses=pre_supplied, ) @@ -400,7 +400,7 @@ async def resume_from_checkpoint( pending = _prompt_for_responses(requests) while completed_output is None and pending: - events = await _consume(workflow.send_responses_streaming(pending)) + events = await _consume(workflow.run_stream(responses=pending)) completed_output, requests = _print_events(events) if completed_output is None: pending = _prompt_for_responses(requests) diff --git a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py index 17fb44c87b..cb0c7705c5 100644 --- a/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py +++ b/python/samples/getting_started/workflows/checkpoint/checkpoint_with_resume.py @@ -47,7 +47,7 @@ - How to configure FileCheckpointStorage and call with_checkpointing on WorkflowBuilder. - How to list and inspect checkpoints programmatically. - How to interactively choose a checkpoint to resume from (instead of always resuming - from the most recent or a hard-coded one) using run_stream_from_checkpoint. + from the most recent or a hard-coded one) using run_stream. - How workflows complete by yielding outputs when idle, not via explicit completion events. Prerequisites: @@ -281,7 +281,7 @@ async def main(): new_workflow = create_workflow(checkpoint_storage=checkpoint_storage) print(f"\nResuming from checkpoint: {chosen_cp_id}") - async for event in new_workflow.run_stream_from_checkpoint(chosen_cp_id, checkpoint_storage=checkpoint_storage): + async for event in new_workflow.run_stream(checkpoint_id=chosen_cp_id, checkpoint_storage=checkpoint_storage): print(f"Resumed Event: {event}") """ diff --git a/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py b/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py index 1591c6f049..16ce02a080 100644 --- a/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py +++ b/python/samples/getting_started/workflows/checkpoint/sub_workflow_checkpoint.py @@ -321,8 +321,8 @@ async def main() -> None: approval_response = "approve" final_event: WorkflowOutputEvent | None = None - async for event in workflow2.run_stream_from_checkpoint( - resume_checkpoint.checkpoint_id, + async for event in workflow2.run_stream( + checkpoint_id=resume_checkpoint.checkpoint_id, responses={request_id: approval_response}, ): if isinstance(event, WorkflowOutputEvent): diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py b/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py index e3c3652df0..ad3bddf3e3 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_parallel_requests.py @@ -407,7 +407,7 @@ async def main() -> None: external_responses[event.request_id] = policy_response print(f" šŸ”’ External policy: {'āœ… APPROVED' if policy_response.approved else 'āŒ DENIED'}") - await main_workflow.send_responses(external_responses) + await main_workflow.run(responses=external_responses) else: print("\nšŸŽÆ All requests were intercepted internally!") diff --git a/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py b/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py index fae0f80e5f..cbc61014c3 100644 --- a/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py +++ b/python/samples/getting_started/workflows/composition/sub_workflow_request_interception.py @@ -275,7 +275,7 @@ async def run_example() -> None: external_responses[event.request_id] = approved # 9. Send external responses - await main_workflow.send_responses(external_responses) + await main_workflow.run(responses=external_responses) else: print("\nšŸŽÆ All requests were intercepted and handled locally!") diff --git a/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py b/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py index 02aa758bea..e8f4867394 100644 --- a/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py +++ b/python/samples/getting_started/workflows/human-in-the-loop/guessing_game_with_human_input.py @@ -39,7 +39,7 @@ Demonstrate: - Alternating turns between an AgentExecutor and a human, driven by events. - Using Pydantic response_format to enforce structured JSON output from the agent instead of regex parsing. -- Driving the loop in application code with run_stream and send_responses_streaming. +- Driving the loop in application code with run_stream and responses parameter. Prerequisites: - Azure OpenAI configured for AzureOpenAIChatClient with required environment variables. @@ -52,7 +52,7 @@ # emits a RequestInfoEvent with a typed payload, and then resumes the graph only after your application # supplies a matching RequestResponse keyed by the emitted request_id. It does not gather input by itself. # Your application is responsible for collecting the human reply from any UI or CLI and then calling -# send_responses_streaming with a dict mapping request_id to the human's answer. The executor exists to +# run_stream with a responses parameter mapping request_id to the human's answer. The executor exists to # standardize pause-and-resume human gating, to carry typed request payloads, and to preserve correlation. @@ -208,10 +208,8 @@ async def main() -> None: while not completed: # First iteration uses run_stream("start"). - # Subsequent iterations use send_responses_streaming with pending_responses from the console. - stream = ( - workflow.send_responses_streaming(pending_responses) if pending_responses else workflow.run_stream("start") - ) + # Subsequent iterations use run_stream with pending_responses from the console. + stream = workflow.run_stream(responses=pending_responses) if pending_responses else workflow.run_stream("start") # Collect events for this turn. Among these you may see WorkflowStatusEvent # with state IDLE_WITH_PENDING_REQUESTS when the workflow pauses for # human input, preceded by IN_PROGRESS_PENDING_REQUESTS as requests are diff --git a/python/samples/getting_started/workflows/orchestration/handoff_simple.py b/python/samples/getting_started/workflows/orchestration/handoff_simple.py index 6092083266..f308e44355 100644 --- a/python/samples/getting_started/workflows/orchestration/handoff_simple.py +++ b/python/samples/getting_started/workflows/orchestration/handoff_simple.py @@ -244,7 +244,7 @@ async def main() -> None: responses = {req.request_id: user_response for req in pending_requests} # Send responses and get new events - events = await _drain(workflow.send_responses_streaming(responses)) + events = await _drain(workflow.run_stream(responses=responses)) pending_requests = _handle_events(events) """ diff --git a/python/samples/getting_started/workflows/orchestration/handoff_specialist_to_specialist.py b/python/samples/getting_started/workflows/orchestration/handoff_specialist_to_specialist.py index 5e92a6325c..a5f188ad69 100644 --- a/python/samples/getting_started/workflows/orchestration/handoff_specialist_to_specialist.py +++ b/python/samples/getting_started/workflows/orchestration/handoff_specialist_to_specialist.py @@ -205,7 +205,7 @@ async def main() -> None: print(f"\n[User]: {user_response}\n") responses = {req.request_id: user_response for req in pending_requests} - events = await _drain(workflow.send_responses_streaming(responses)) + events = await _drain(workflow.run_stream(responses=responses)) pending_requests = _handle_events(events) response_index += 1 diff --git a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py index fcd6d760ef..70bb1e2b8a 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_checkpoint.py @@ -31,7 +31,7 @@ must keep stable IDs so the checkpoint state aligns when we rebuild the graph. 2. **Executor snapshotting** - checkpoints capture the `RequestInfoExecutor` state, specifically the pending plan-review request map, at superstep boundaries. -3. **Resume with responses** - `Workflow.run_stream_from_checkpoint` accepts a +3. **Resume with responses** - `Workflow.run_stream` accepts a `checkpoint_id` and `responses` mapping so we can inject the stored human reply during restoration. Prerequisites: @@ -137,12 +137,12 @@ async def main() -> None: approval = MagenticPlanReviewReply(decision=MagenticPlanReviewDecision.APPROVE) # Resume execution and supply the recorded approval in a single call. - # `run_stream_from_checkpoint` rebuilds executor state, applies the provided responses, + # `run_stream` rebuilds executor state, applies the provided responses, # and then continues the workflow. Because we only captured the initial plan review # checkpoint, the resumed run should complete almost immediately. final_event: WorkflowOutputEvent | None = None - async for event in resumed_workflow.run_stream_from_checkpoint( - resume_checkpoint.checkpoint_id, + async for event in resumed_workflow.run_stream( + checkpoint_id=resume_checkpoint.checkpoint_id, responses={plan_review_request_id: approval}, ): if isinstance(event, WorkflowOutputEvent): @@ -204,8 +204,8 @@ def _pending_message_count(cp: WorkflowCheckpoint) -> int: final_event_post: WorkflowOutputEvent | None = None post_emitted_events = False post_plan_workflow = build_workflow(checkpoint_storage) - async for event in post_plan_workflow.run_stream_from_checkpoint( - post_plan_checkpoint.checkpoint_id, + async for event in post_plan_workflow.run_stream( + checkpoint_id=post_plan_checkpoint.checkpoint_id, responses={}, ): post_emitted_events = True diff --git a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py index 5ba8b5cc23..3c42599f86 100644 --- a/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py +++ b/python/samples/getting_started/workflows/orchestration/magentic_human_plan_update.py @@ -111,7 +111,7 @@ def on_exception(exception: Exception) -> None: while not completed: # Use streaming for both initial run and response sending if pending_responses is not None: - stream = workflow.send_responses_streaming(pending_responses) + stream = workflow.run_stream(responses=pending_responses) else: stream = workflow.run_stream(task) diff --git a/python/samples/semantic-kernel-migration/orchestrations/handoff.py b/python/samples/semantic-kernel-migration/orchestrations/handoff.py index 2bf1f73665..96c12b1da1 100644 --- a/python/samples/semantic-kernel-migration/orchestrations/handoff.py +++ b/python/samples/semantic-kernel-migration/orchestrations/handoff.py @@ -255,7 +255,7 @@ async def run_agent_framework_example(initial_task: str, scripted_responses: Seq except StopIteration: user_reply = "Thanks, that's all." responses = {request.request_id: user_reply for request in pending} - final_events = await _drain_events(workflow.send_responses_streaming(responses)) + final_events = await _drain_events(workflow.run_stream(responses=responses)) pending = _collect_handoff_requests(final_events) conversation = _extract_final_conversation(final_events)