From 0ceb48962ead60fb7aca0828d708f7627ca307c9 Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Sat, 17 Jan 2026 01:25:10 +0100 Subject: [PATCH 1/5] feat: Workflow Control Plane support Adds workflow control plane methods and LangGraph adapter for governance gates at workflow step transitions. ## Features - Workflow control methods: create_workflow, get_workflow, step_gate, etc. - LangGraph adapter (AxonFlowLangGraphAdapter) for simplified integration - WorkflowBlockedError and WorkflowApprovalRequiredError exceptions ## New Files - axonflow/workflow.py - Pydantic models for workflow types - axonflow/adapters/langgraph.py - LangGraph adapter - axonflow/adapters/__init__.py - Adapter exports Related to getaxonflow/axonflow-enterprise#834 --- axonflow/__init__.py | 35 ++++ axonflow/adapters/__init__.py | 8 + axonflow/adapters/langgraph.py | 360 +++++++++++++++++++++++++++++++++ axonflow/client.py | 341 +++++++++++++++++++++++++++++++ axonflow/workflow.py | 196 ++++++++++++++++++ 5 files changed, 940 insertions(+) create mode 100644 axonflow/adapters/__init__.py create mode 100644 axonflow/adapters/langgraph.py create mode 100644 axonflow/workflow.py diff --git a/axonflow/__init__.py b/axonflow/__init__.py index f8acb20..2eeb164 100644 --- a/axonflow/__init__.py +++ b/axonflow/__init__.py @@ -127,6 +127,24 @@ UsageRecordsResponse, UsageSummary, ) +from axonflow.workflow import ( + AbortWorkflowRequest, + ApprovalStatus, + CreateWorkflowRequest, + CreateWorkflowResponse, + GateDecision, + ListWorkflowsOptions, + ListWorkflowsResponse, + MarkStepCompletedRequest, + PolicyMatch, + StepGateRequest, + StepGateResponse, + StepType, + WorkflowSource, + WorkflowStatus, + WorkflowStatusResponse, + WorkflowStepInfo, +) __version__ = "1.2.0" __all__ = [ @@ -236,4 +254,21 @@ "ModelPricing", "PricingInfo", "PricingListResponse", + # Workflow Control Plane types + "WorkflowStatus", + "WorkflowSource", + "GateDecision", + "ApprovalStatus", + "StepType", + "CreateWorkflowRequest", + "CreateWorkflowResponse", + "StepGateRequest", + "StepGateResponse", + "WorkflowStepInfo", + "WorkflowStatusResponse", + "ListWorkflowsOptions", + "ListWorkflowsResponse", + "MarkStepCompletedRequest", + "AbortWorkflowRequest", + "PolicyMatch", ] diff --git a/axonflow/adapters/__init__.py b/axonflow/adapters/__init__.py new file mode 100644 index 0000000..7007da1 --- /dev/null +++ b/axonflow/adapters/__init__.py @@ -0,0 +1,8 @@ +"""AxonFlow adapters for external orchestrators.""" + +from axonflow.adapters.langgraph import AxonFlowLangGraphAdapter, WorkflowBlockedError + +__all__ = [ + "AxonFlowLangGraphAdapter", + "WorkflowBlockedError", +] diff --git a/axonflow/adapters/langgraph.py b/axonflow/adapters/langgraph.py new file mode 100644 index 0000000..a4351c6 --- /dev/null +++ b/axonflow/adapters/langgraph.py @@ -0,0 +1,360 @@ +"""LangGraph Adapter for AxonFlow Workflow Control Plane. + +This adapter wraps LangGraph workflows with AxonFlow governance gates, +providing policy enforcement at step transitions. + +"LangGraph runs the workflow. AxonFlow decides when it's allowed to move forward." + +Example: + >>> from langgraph.graph import StateGraph + >>> from axonflow import AxonFlow + >>> from axonflow.adapters import AxonFlowLangGraphAdapter + >>> + >>> # Create your LangGraph workflow + >>> graph = StateGraph(...) + >>> + >>> # Wrap with AxonFlow governance + >>> async with AxonFlow(endpoint="http://localhost:8080") as client: + ... adapter = AxonFlowLangGraphAdapter(client, "my-workflow") + ... + ... # Start workflow and register with AxonFlow + ... await adapter.start_workflow() + ... + ... # Before each step, check the gate + ... if await adapter.check_gate("generate_code", "llm_call", model="gpt-4"): + ... # Execute the step + ... result = await execute_step() + ... await adapter.step_completed("generate_code") + ... + ... # Complete workflow + ... await adapter.complete_workflow() +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from axonflow.workflow import ( + CreateWorkflowRequest, + GateDecision, + MarkStepCompletedRequest, + StepGateRequest, + StepType, + WorkflowSource, +) + +if TYPE_CHECKING: + from axonflow import AxonFlow + + +class WorkflowBlockedError(Exception): + """Raised when a workflow step is blocked by policy.""" + + def __init__( + self, + message: str, + step_id: str | None = None, + reason: str | None = None, + policy_ids: list[str] | None = None, + ) -> None: + super().__init__(message) + self.step_id = step_id + self.reason = reason + self.policy_ids = policy_ids or [] + + +class WorkflowApprovalRequiredError(Exception): + """Raised when a workflow step requires approval.""" + + def __init__( + self, + message: str, + step_id: str | None = None, + approval_url: str | None = None, + reason: str | None = None, + ) -> None: + super().__init__(message) + self.step_id = step_id + self.approval_url = approval_url + self.reason = reason + + +class AxonFlowLangGraphAdapter: + """Wraps LangGraph workflows with AxonFlow governance gates. + + This adapter provides a simple interface for integrating AxonFlow's + Workflow Control Plane with LangGraph workflows. It handles workflow + registration, step gate checks, and workflow lifecycle management. + + Attributes: + client: AxonFlow client instance + workflow_name: Name of the workflow + workflow_id: ID assigned after workflow creation (None until started) + source: Workflow source (defaults to langgraph) + + Example: + >>> adapter = AxonFlowLangGraphAdapter(client, "code-review-pipeline") + >>> await adapter.start_workflow(total_steps=5) + >>> + >>> # Before each LangGraph node execution + >>> if await adapter.check_gate("analyze", "llm_call"): + ... result = await analyze_code(state) + ... await adapter.step_completed("analyze") + """ + + def __init__( + self, + client: AxonFlow, + workflow_name: str, + source: WorkflowSource = WorkflowSource.LANGGRAPH, + *, + auto_block: bool = True, + ) -> None: + """Initialize the LangGraph adapter. + + Args: + client: AxonFlow client instance + workflow_name: Human-readable name for the workflow + source: Workflow source (defaults to langgraph) + auto_block: If True, check_gate raises WorkflowBlockedError on block + If False, returns False and caller handles it + """ + self.client = client + self.workflow_name = workflow_name + self.source = source + self.workflow_id: str | None = None + self._step_counter = 0 + self._auto_block = auto_block + + async def start_workflow( + self, + total_steps: int | None = None, + metadata: dict[str, Any] | None = None, + ) -> str: + """Register the workflow with AxonFlow. + + Call this at the start of your LangGraph workflow execution. + + Args: + total_steps: Total number of steps (if known) + metadata: Additional workflow metadata + + Returns: + The assigned workflow ID + + Example: + >>> workflow_id = await adapter.start_workflow( + ... total_steps=5, + ... metadata={"customer_id": "cust-123"} + ... ) + """ + request = CreateWorkflowRequest( + workflow_name=self.workflow_name, + source=self.source, + total_steps=total_steps, + metadata=metadata or {}, + ) + + response = await self.client.create_workflow(request) + self.workflow_id = response.workflow_id + return self.workflow_id + + async def check_gate( + self, + step_name: str, + step_type: str | StepType, + *, + step_id: str | None = None, + step_input: dict[str, Any] | None = None, + model: str | None = None, + provider: str | None = None, + ) -> bool: + """Check if a step is allowed to proceed. + + Call this before executing each LangGraph node to check policy approval. + + Args: + step_name: Human-readable step name + step_type: Type of step (llm_call, tool_call, connector_call, human_task) + step_id: Optional step ID (auto-generated if not provided) + step_input: Input data for the step (for policy evaluation) + model: LLM model being used + provider: LLM provider being used + + Returns: + True if step is allowed, False if blocked (when auto_block=False) + + Raises: + WorkflowBlockedError: If step is blocked and auto_block=True + WorkflowApprovalRequiredError: If step requires approval + ValueError: If workflow not started + + Example: + >>> if await adapter.check_gate("generate", "llm_call", model="gpt-4"): + ... result = await generate_code(state) + """ + if not self.workflow_id: + raise ValueError("Workflow not started. Call start_workflow() first.") + + # Convert string to StepType if needed + if isinstance(step_type, str): + step_type = StepType(step_type) + + # Generate step ID if not provided + if step_id is None: + self._step_counter += 1 + step_id = f"step-{self._step_counter}-{step_name.lower().replace(' ', '-')}" + + request = StepGateRequest( + step_name=step_name, + step_type=step_type, + step_input=step_input or {}, + model=model, + provider=provider, + ) + + response = await self.client.step_gate(self.workflow_id, step_id, request) + + if response.decision == GateDecision.BLOCK: + if self._auto_block: + raise WorkflowBlockedError( + f"Step '{step_name}' blocked: {response.reason}", + step_id=response.step_id, + reason=response.reason, + policy_ids=response.policy_ids, + ) + return False + + if response.decision == GateDecision.REQUIRE_APPROVAL: + raise WorkflowApprovalRequiredError( + f"Step '{step_name}' requires approval", + step_id=response.step_id, + approval_url=response.approval_url, + reason=response.reason, + ) + + return True + + async def step_completed( + self, + step_name: str, + *, + step_id: str | None = None, + output: dict[str, Any] | None = None, + metadata: dict[str, Any] | None = None, + ) -> None: + """Mark a step as completed. + + Call this after successfully executing a LangGraph node. + + Args: + step_name: Step name (used to generate step_id if not provided) + step_id: Optional step ID (must match the one used in check_gate) + output: Output data from the step + metadata: Additional metadata + + Example: + >>> await adapter.step_completed("generate", output={"code": result}) + """ + if not self.workflow_id: + raise ValueError("Workflow not started. Call start_workflow() first.") + + # Generate step ID if not provided (must match check_gate) + if step_id is None: + step_id = f"step-{self._step_counter}-{step_name.lower().replace(' ', '-')}" + + request = MarkStepCompletedRequest( + output=output or {}, + metadata=metadata or {}, + ) + + await self.client.mark_step_completed(self.workflow_id, step_id, request) + + async def complete_workflow(self) -> None: + """Mark the workflow as completed. + + Call this when your LangGraph workflow finishes successfully. + + Example: + >>> await adapter.complete_workflow() + """ + if not self.workflow_id: + raise ValueError("Workflow not started. Call start_workflow() first.") + + await self.client.complete_workflow(self.workflow_id) + + async def abort_workflow(self, reason: str | None = None) -> None: + """Abort the workflow. + + Call this when your LangGraph workflow fails or is cancelled. + + Args: + reason: Reason for aborting + + Example: + >>> await adapter.abort_workflow("User cancelled the operation") + """ + if not self.workflow_id: + raise ValueError("Workflow not started. Call start_workflow() first.") + + await self.client.abort_workflow(self.workflow_id, reason) + + async def wait_for_approval( + self, + step_id: str, + *, + poll_interval: float = 5.0, + timeout: float = 300.0, + ) -> bool: + """Wait for a step to be approved. + + Poll the workflow status until the step is approved or rejected. + + Args: + step_id: Step ID to wait for + poll_interval: Seconds between polls + timeout: Maximum seconds to wait + + Returns: + True if approved, False if rejected + + Raises: + TimeoutError: If approval not received within timeout + """ + import asyncio + + if not self.workflow_id: + raise ValueError("Workflow not started. Call start_workflow() first.") + + elapsed = 0.0 + while elapsed < timeout: + status = await self.client.get_workflow(self.workflow_id) + + # Find the step + for step in status.steps: + if step.step_id == step_id: + if step.approval_status: + from axonflow.workflow import ApprovalStatus + + if step.approval_status == ApprovalStatus.APPROVED: + return True + if step.approval_status == ApprovalStatus.REJECTED: + return False + break + + await asyncio.sleep(poll_interval) + elapsed += poll_interval + + raise TimeoutError(f"Approval timeout after {timeout}s for step {step_id}") + + async def __aenter__(self) -> AxonFlowLangGraphAdapter: + """Context manager entry.""" + return self + + async def __aexit__(self, exc_type: type | None, exc_val: Exception | None, exc_tb: Any) -> None: + """Context manager exit - abort if exception, complete otherwise.""" + if self.workflow_id: + if exc_type is not None: + await self.abort_workflow(f"Exception: {exc_val}") + else: + await self.complete_workflow() diff --git a/axonflow/client.py b/axonflow/client.py index d9c241b..37106fe 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -130,6 +130,23 @@ UsageRecordsResponse, UsageSummary, ) +from axonflow.workflow import ( + AbortWorkflowRequest, + ApprovalStatus, + CreateWorkflowRequest, + CreateWorkflowResponse, + GateDecision, + ListWorkflowsOptions, + ListWorkflowsResponse, + MarkStepCompletedRequest, + StepGateRequest, + StepGateResponse, + StepType, + WorkflowSource, + WorkflowStatus, + WorkflowStatusResponse, + WorkflowStepInfo, +) if TYPE_CHECKING: from types import TracebackType @@ -2839,6 +2856,330 @@ async def get_pricing( return PricingListResponse(pricing=[PricingInfo.model_validate(response)]) return PricingListResponse.model_validate(response) + # ======================================== + # WORKFLOW CONTROL PLANE + # ======================================== + # The Workflow Control Plane provides governance gates for external + # orchestrators like LangChain, LangGraph, and CrewAI. + # + # "LangChain runs the workflow. AxonFlow decides when it's allowed to move forward." + # + # Usage: + # 1. Call create_workflow() to register a new workflow + # 2. Before each step, call step_gate() to check if the step is allowed + # 3. If decision is 'block', stop the workflow + # 4. If decision is 'require_approval', wait for approval + # 5. After each step, optionally call mark_step_completed() + # 6. Call complete_workflow() or abort_workflow() when done + + async def create_workflow( + self, + request: CreateWorkflowRequest, + ) -> CreateWorkflowResponse: + """Create a new workflow for governance tracking. + + Registers a new workflow with AxonFlow. Call this at the start of your + external orchestrator workflow (LangChain, LangGraph, CrewAI, etc.). + + Args: + request: Workflow creation request + + Returns: + Created workflow with ID + + Example: + >>> workflow = await client.create_workflow( + ... CreateWorkflowRequest( + ... workflow_name="customer-support-agent", + ... source=WorkflowSource.LANGGRAPH, + ... total_steps=5, + ... metadata={"customer_id": "cust-123"} + ... ) + ... ) + >>> print(f"Workflow created: {workflow.workflow_id}") + """ + body = { + "workflow_name": request.workflow_name, + "source": request.source.value if request.source else "external", + "total_steps": request.total_steps, + "metadata": request.metadata, + } + + if self._config.debug: + self._logger.debug("Creating workflow", workflow_name=request.workflow_name) + + response = await self._orchestrator_request("POST", "/api/v1/workflows", json_data=body) + + return CreateWorkflowResponse( + workflow_id=response["workflow_id"], + workflow_name=response["workflow_name"], + source=WorkflowSource(response["source"]), + status=WorkflowStatus(response["status"]), + created_at=_parse_datetime(response["created_at"]), + ) + + async def get_workflow(self, workflow_id: str) -> WorkflowStatusResponse: + """Get the status of a workflow. + + Args: + workflow_id: Workflow ID + + Returns: + Workflow status including steps + + Example: + >>> status = await client.get_workflow("wf_123") + >>> print(f"Status: {status.status}, Step: {status.current_step_index}") + """ + response = await self._orchestrator_request("GET", f"/api/v1/workflows/{workflow_id}") + return self._map_workflow_response(response) + + async def step_gate( + self, + workflow_id: str, + step_id: str, + request: StepGateRequest, + ) -> StepGateResponse: + """Check if a workflow step is allowed to proceed (step gate). + + This is the core governance method. Call this before executing each step + in your workflow to check if the step is allowed based on policies. + + Args: + workflow_id: Workflow ID + step_id: Unique step identifier (you provide this) + request: Step gate request with step details + + Returns: + Gate decision: allow, block, or require_approval + + Example: + >>> gate = await client.step_gate( + ... "wf_123", + ... "step-generate-code", + ... StepGateRequest( + ... step_name="Generate Code", + ... step_type=StepType.LLM_CALL, + ... model="gpt-4", + ... provider="openai" + ... ) + ... ) + >>> if gate.decision == GateDecision.BLOCK: + ... raise Exception(f"Step blocked: {gate.reason}") + >>> elif gate.decision == GateDecision.REQUIRE_APPROVAL: + ... print(f"Waiting for approval: {gate.approval_url}") + """ + body = { + "step_name": request.step_name, + "step_type": request.step_type.value, + "step_input": request.step_input, + "model": request.model, + "provider": request.provider, + } + + if self._config.debug: + self._logger.debug( + "Checking step gate", + workflow_id=workflow_id, + step_id=step_id, + step_type=request.step_type.value, + ) + + response = await self._orchestrator_request( + "POST", + f"/api/v1/workflows/{workflow_id}/steps/{step_id}/gate", + json_data=body, + ) + + return StepGateResponse( + decision=GateDecision(response["decision"]), + step_id=response["step_id"], + reason=response.get("reason"), + policy_ids=response.get("policy_ids", []), + approval_url=response.get("approval_url"), + ) + + async def mark_step_completed( + self, + workflow_id: str, + step_id: str, + request: MarkStepCompletedRequest | None = None, + ) -> None: + """Mark a step as completed. + + Call this after successfully executing a step to record its completion. + + Args: + workflow_id: Workflow ID + step_id: Step ID + request: Optional completion request with output data + + Example: + >>> await client.mark_step_completed( + ... "wf_123", + ... "step-1", + ... MarkStepCompletedRequest(output={"result": "Code generated"}) + ... ) + """ + body = {} + if request: + body = {"output": request.output, "metadata": request.metadata} + + await self._orchestrator_request( + "POST", + f"/api/v1/workflows/{workflow_id}/steps/{step_id}/complete", + json_data=body, + ) + + if self._config.debug: + self._logger.debug("Step marked completed", workflow_id=workflow_id, step_id=step_id) + + async def complete_workflow(self, workflow_id: str) -> None: + """Complete a workflow successfully. + + Call this when your workflow has completed all steps successfully. + + Args: + workflow_id: Workflow ID + + Example: + >>> await client.complete_workflow("wf_123") + """ + await self._orchestrator_request( + "POST", + f"/api/v1/workflows/{workflow_id}/complete", + json_data={}, + ) + + if self._config.debug: + self._logger.debug("Workflow completed", workflow_id=workflow_id) + + async def abort_workflow(self, workflow_id: str, reason: str | None = None) -> None: + """Abort a workflow. + + Call this when you need to stop a workflow due to an error or user request. + + Args: + workflow_id: Workflow ID + reason: Optional reason for aborting + + Example: + >>> await client.abort_workflow("wf_123", "User cancelled the operation") + """ + body = {"reason": reason} if reason else {} + + await self._orchestrator_request( + "POST", + f"/api/v1/workflows/{workflow_id}/abort", + json_data=body, + ) + + if self._config.debug: + self._logger.debug("Workflow aborted", workflow_id=workflow_id, reason=reason) + + async def resume_workflow(self, workflow_id: str) -> None: + """Resume a workflow after approval. + + Call this after a step has been approved to continue the workflow. + + Args: + workflow_id: Workflow ID + + Example: + >>> # After approval received via webhook or polling + >>> await client.resume_workflow("wf_123") + """ + await self._orchestrator_request( + "POST", + f"/api/v1/workflows/{workflow_id}/resume", + json_data={}, + ) + + if self._config.debug: + self._logger.debug("Workflow resumed", workflow_id=workflow_id) + + async def list_workflows( + self, + options: ListWorkflowsOptions | None = None, + ) -> ListWorkflowsResponse: + """List workflows with optional filters. + + Args: + options: Filter and pagination options + + Returns: + List of workflows + + Example: + >>> result = await client.list_workflows( + ... ListWorkflowsOptions( + ... status=WorkflowStatus.IN_PROGRESS, + ... source=WorkflowSource.LANGGRAPH, + ... limit=10 + ... ) + ... ) + >>> print(f"Found {result.total} workflows") + """ + params: list[str] = [] + if options: + if options.status: + params.append(f"status={options.status.value}") + if options.source: + params.append(f"source={options.source.value}") + if options.limit: + params.append(f"limit={options.limit}") + if options.offset: + params.append(f"offset={options.offset}") + + path = "/api/v1/workflows" + if params: + path = f"{path}?{'&'.join(params)}" + + response = await self._orchestrator_request("GET", path) + + workflows = [self._map_workflow_response(w) for w in response.get("workflows", [])] + + return ListWorkflowsResponse( + workflows=workflows, + total=response.get("total", len(workflows)), + ) + + def _map_workflow_response(self, data: dict[str, Any]) -> WorkflowStatusResponse: + """Map API response to WorkflowStatusResponse.""" + steps = [] + if data.get("steps"): + for s in data["steps"]: + steps.append( + WorkflowStepInfo( + step_id=s["step_id"], + step_index=s["step_index"], + step_name=s.get("step_name"), + step_type=StepType(s["step_type"]), + decision=GateDecision(s["decision"]), + decision_reason=s.get("decision_reason"), + approval_status=ApprovalStatus(s["approval_status"]) + if s.get("approval_status") + else None, + approved_by=s.get("approved_by"), + gate_checked_at=_parse_datetime(s["gate_checked_at"]), + completed_at=_parse_datetime(s["completed_at"]) + if s.get("completed_at") + else None, + ) + ) + + return WorkflowStatusResponse( + workflow_id=data["workflow_id"], + workflow_name=data["workflow_name"], + source=WorkflowSource(data["source"]), + status=WorkflowStatus(data["status"]), + current_step_index=data.get("current_step_index", 0), + total_steps=data.get("total_steps"), + started_at=_parse_datetime(data["started_at"]), + completed_at=_parse_datetime(data["completed_at"]) if data.get("completed_at") else None, + steps=steps, + ) + class SyncAxonFlow: """Synchronous wrapper for AxonFlow client. diff --git a/axonflow/workflow.py b/axonflow/workflow.py new file mode 100644 index 0000000..43ce5c0 --- /dev/null +++ b/axonflow/workflow.py @@ -0,0 +1,196 @@ +"""Workflow Control Plane Types for AxonFlow SDK. + +The Workflow Control Plane provides governance gates for external orchestrators +like LangChain, LangGraph, and CrewAI. These types define the request/response +structures for registering workflows, checking step gates, and managing workflow +lifecycle. + +"LangChain runs the workflow. AxonFlow decides when it's allowed to move forward." +""" + +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from typing import Any + +from pydantic import BaseModel, ConfigDict, Field + + +class WorkflowStatus(str, Enum): + """Workflow status values.""" + + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + ABORTED = "aborted" + FAILED = "failed" + + +class WorkflowSource(str, Enum): + """Source of the workflow (which orchestrator is running it).""" + + LANGGRAPH = "langgraph" + LANGCHAIN = "langchain" + CREWAI = "crewai" + EXTERNAL = "external" + + +class GateDecision(str, Enum): + """Gate decision values returned by step gate checks.""" + + ALLOW = "allow" + BLOCK = "block" + REQUIRE_APPROVAL = "require_approval" + + +class ApprovalStatus(str, Enum): + """Approval status for steps requiring human approval.""" + + PENDING = "pending" + APPROVED = "approved" + REJECTED = "rejected" + + +class StepType(str, Enum): + """Step type indicating what kind of operation the step performs.""" + + LLM_CALL = "llm_call" + TOOL_CALL = "tool_call" + CONNECTOR_CALL = "connector_call" + HUMAN_TASK = "human_task" + + +class CreateWorkflowRequest(BaseModel): + """Request to create a new workflow.""" + + model_config = ConfigDict(frozen=True) + + workflow_name: str = Field(..., min_length=1, description="Human-readable name for the workflow") + source: WorkflowSource | None = Field( + default=None, description="Source orchestrator running the workflow" + ) + total_steps: int | None = Field( + default=None, ge=0, description="Total number of steps in the workflow (if known)" + ) + metadata: dict[str, Any] = Field( + default_factory=dict, description="Additional metadata for the workflow" + ) + + +class CreateWorkflowResponse(BaseModel): + """Response from creating a workflow.""" + + workflow_id: str = Field(..., description="Unique identifier for the workflow") + workflow_name: str = Field(..., description="Name of the workflow") + source: WorkflowSource = Field(..., description="Source orchestrator") + status: WorkflowStatus = Field(..., description="Current status (always 'in_progress' for new)") + created_at: datetime = Field(..., description="When the workflow was created") + + +class StepGateRequest(BaseModel): + """Request to check if a step is allowed to proceed.""" + + model_config = ConfigDict(frozen=True) + + step_name: str | None = Field(default=None, description="Human-readable name for the step") + step_type: StepType = Field(..., description="Type of step being executed") + step_input: dict[str, Any] = Field( + default_factory=dict, description="Input data for the step (for policy evaluation)" + ) + model: str | None = Field(default=None, description="LLM model being used (if applicable)") + provider: str | None = Field(default=None, description="LLM provider (if applicable)") + + +class StepGateResponse(BaseModel): + """Response from a step gate check.""" + + decision: GateDecision = Field(..., description="The gate decision: allow, block, or require_approval") + step_id: str = Field(..., description="Unique step ID assigned by the system") + reason: str | None = Field( + default=None, description="Reason for the decision (especially for block/require_approval)" + ) + policy_ids: list[str] = Field( + default_factory=list, description="IDs of policies that matched and influenced the decision" + ) + approval_url: str | None = Field( + default=None, description="URL to the approval portal (if decision is require_approval)" + ) + + +class WorkflowStepInfo(BaseModel): + """Information about a workflow step.""" + + step_id: str = Field(..., description="Unique step identifier") + step_index: int = Field(..., ge=0, description="Step index in the workflow") + step_name: str | None = Field(default=None, description="Step name") + step_type: StepType = Field(..., description="Step type") + decision: GateDecision = Field(..., description="Gate decision for this step") + decision_reason: str | None = Field(default=None, description="Reason for the decision") + approval_status: ApprovalStatus | None = Field( + default=None, description="Approval status (if require_approval decision)" + ) + approved_by: str | None = Field(default=None, description="Who approved the step (if approved)") + gate_checked_at: datetime = Field(..., description="When the gate was checked") + completed_at: datetime | None = Field(default=None, description="When the step was completed") + + +class WorkflowStatusResponse(BaseModel): + """Response containing workflow status.""" + + workflow_id: str = Field(..., description="Workflow ID") + workflow_name: str = Field(..., description="Workflow name") + source: WorkflowSource = Field(..., description="Source orchestrator") + status: WorkflowStatus = Field(..., description="Current status") + current_step_index: int = Field(default=0, ge=0, description="Current step index (0-based)") + total_steps: int | None = Field(default=None, ge=0, description="Total steps in the workflow") + started_at: datetime = Field(..., description="When the workflow started") + completed_at: datetime | None = Field( + default=None, description="When the workflow completed (if completed)" + ) + steps: list[WorkflowStepInfo] = Field( + default_factory=list, description="List of steps in the workflow" + ) + + +class ListWorkflowsOptions(BaseModel): + """Options for listing workflows.""" + + model_config = ConfigDict(frozen=True) + + status: WorkflowStatus | None = Field(default=None, description="Filter by workflow status") + source: WorkflowSource | None = Field(default=None, description="Filter by source") + limit: int = Field(default=50, ge=1, le=100, description="Maximum number of results to return") + offset: int = Field(default=0, ge=0, description="Offset for pagination") + + +class ListWorkflowsResponse(BaseModel): + """Response from listing workflows.""" + + workflows: list[WorkflowStatusResponse] = Field(default_factory=list, description="List of workflows") + total: int = Field(default=0, ge=0, description="Total count (for pagination)") + + +class MarkStepCompletedRequest(BaseModel): + """Request to mark a step as completed.""" + + model_config = ConfigDict(frozen=True) + + output: dict[str, Any] = Field(default_factory=dict, description="Output data from the step") + metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata") + + +class AbortWorkflowRequest(BaseModel): + """Request to abort a workflow.""" + + model_config = ConfigDict(frozen=True) + + reason: str | None = Field(default=None, description="Reason for aborting the workflow") + + +class PolicyMatch(BaseModel): + """Policy match information.""" + + policy_id: str = Field(..., description="Policy ID that matched") + policy_name: str = Field(..., description="Policy name") + action: str = Field(..., description="Action taken by the policy") + reason: str | None = Field(default=None, description="Reason for the match") From 1a03a38dbd37bb0b60815cf768b54193c76c288a Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Sat, 17 Jan 2026 13:48:43 +0100 Subject: [PATCH 2/5] fix: resolve E501 line length lint errors --- CHANGELOG.md | 25 +++++++++++++++++++++++++ axonflow/adapters/langgraph.py | 4 +++- axonflow/client.py | 21 +++++++++++++++------ axonflow/workflow.py | 32 +++++++++++++++++++++++++++++--- 4 files changed, 72 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d66090e..dd59ae4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,31 @@ All notable changes to the AxonFlow Python SDK will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.5.0] - 2026-01-17 + +### Added + +- **Workflow Control Plane** (Issue #834): Governance gates for external orchestrators + - "LangChain runs the workflow. AxonFlow decides when it's allowed to move forward." + - `create_workflow()` - Register workflows from LangChain/LangGraph/CrewAI/external + - `step_gate()` - Check if step is allowed to proceed (allow/block/require_approval) + - `mark_step_completed()` - Mark a step as completed with optional output data + - `get_workflow()` - Get workflow status and step history + - `list_workflows()` - List workflows with filters (status, source, pagination) + - `complete_workflow()` - Mark workflow as completed + - `abort_workflow()` - Abort workflow with reason + - `resume_workflow()` - Resume after approval + - New types: `WorkflowStatus`, `WorkflowSource`, `GateDecision`, `StepType`, `ApprovalStatus`, `MarkStepCompletedRequest` + - Helper methods on `StepGateResponse`: `is_allowed()`, `is_blocked()`, `requires_approval()` + - Helper methods on `WorkflowStatus` and `WorkflowStatusResponse`: `is_terminal()` + - LangGraph adapter: `axonflow.adapters.langgraph.AxonFlowLangGraphAdapter` + +### Fixed + +- Datetime parsing now handles variable-length fractional seconds (e.g., 5 digits) for Python 3.9 compatibility + +--- + ## [1.4.0] - 2026-01-14 ### Added diff --git a/axonflow/adapters/langgraph.py b/axonflow/adapters/langgraph.py index a4351c6..ec45e28 100644 --- a/axonflow/adapters/langgraph.py +++ b/axonflow/adapters/langgraph.py @@ -351,7 +351,9 @@ async def __aenter__(self) -> AxonFlowLangGraphAdapter: """Context manager entry.""" return self - async def __aexit__(self, exc_type: type | None, exc_val: Exception | None, exc_tb: Any) -> None: + async def __aexit__( + self, exc_type: type | None, exc_val: Exception | None, exc_tb: Any + ) -> None: """Context manager exit - abort if exception, complete otherwise.""" if self.workflow_id: if exc_type is not None: diff --git a/axonflow/client.py b/axonflow/client.py index 37106fe..90480c8 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -160,15 +160,22 @@ def _parse_datetime(value: str) -> datetime: Python 3.9's fromisoformat() doesn't handle 'Z' suffix for UTC. This helper replaces 'Z' with '+00:00' for compatibility. - Also handles nanosecond precision (9 digits) by truncating to microseconds (6 digits) - since Python's fromisoformat() only supports up to 6 fractional digits. + Also normalizes fractional seconds to exactly 6 digits (microseconds) + since Python 3.9's fromisoformat() requires 0, 3, or 6 fractional digits. """ if value.endswith("Z"): value = value[:-1] + "+00:00" - # Python's fromisoformat only supports up to 6 fractional digits (microseconds) - # Truncate nanoseconds (9 digits) to microseconds (6 digits) if needed - value = re.sub(r"(\.\d{6})\d+", r"\1", value) + # Normalize fractional seconds to exactly 6 digits for Python 3.9 compatibility + # Handles cases like .35012 (5 digits) -> .350120, or .123456789 (9 digits) -> .123456 + def normalize_fractional_seconds(match: re.Match) -> str: + frac = match.group(1) + suffix = match.group(2) + # Pad with zeros if less than 6 digits, truncate if more than 6 + normalized = frac[:6].ljust(6, "0") + return f".{normalized}{suffix}" + + value = re.sub(r"\.(\d+)([+-]|$)", normalize_fractional_seconds, value) return datetime.fromisoformat(value) @@ -3176,7 +3183,9 @@ def _map_workflow_response(self, data: dict[str, Any]) -> WorkflowStatusResponse current_step_index=data.get("current_step_index", 0), total_steps=data.get("total_steps"), started_at=_parse_datetime(data["started_at"]), - completed_at=_parse_datetime(data["completed_at"]) if data.get("completed_at") else None, + completed_at=( + _parse_datetime(data["completed_at"]) if data.get("completed_at") else None + ), steps=steps, ) diff --git a/axonflow/workflow.py b/axonflow/workflow.py index 43ce5c0..a8d6cac 100644 --- a/axonflow/workflow.py +++ b/axonflow/workflow.py @@ -25,6 +25,10 @@ class WorkflowStatus(str, Enum): ABORTED = "aborted" FAILED = "failed" + def is_terminal(self) -> bool: + """Check if the workflow status is terminal (completed, aborted, or failed).""" + return self in (WorkflowStatus.COMPLETED, WorkflowStatus.ABORTED, WorkflowStatus.FAILED) + class WorkflowSource(str, Enum): """Source of the workflow (which orchestrator is running it).""" @@ -65,7 +69,9 @@ class CreateWorkflowRequest(BaseModel): model_config = ConfigDict(frozen=True) - workflow_name: str = Field(..., min_length=1, description="Human-readable name for the workflow") + workflow_name: str = Field( + ..., min_length=1, description="Human-readable name for the workflow" + ) source: WorkflowSource | None = Field( default=None, description="Source orchestrator running the workflow" ) @@ -104,7 +110,9 @@ class StepGateRequest(BaseModel): class StepGateResponse(BaseModel): """Response from a step gate check.""" - decision: GateDecision = Field(..., description="The gate decision: allow, block, or require_approval") + decision: GateDecision = Field( + ..., description="The gate decision: allow, block, or require_approval" + ) step_id: str = Field(..., description="Unique step ID assigned by the system") reason: str | None = Field( default=None, description="Reason for the decision (especially for block/require_approval)" @@ -116,6 +124,18 @@ class StepGateResponse(BaseModel): default=None, description="URL to the approval portal (if decision is require_approval)" ) + def is_allowed(self) -> bool: + """Check if the step is allowed to proceed.""" + return self.decision == GateDecision.ALLOW + + def is_blocked(self) -> bool: + """Check if the step is blocked by policy.""" + return self.decision == GateDecision.BLOCK + + def requires_approval(self) -> bool: + """Check if the step requires human approval.""" + return self.decision == GateDecision.REQUIRE_APPROVAL + class WorkflowStepInfo(BaseModel): """Information about a workflow step.""" @@ -151,6 +171,10 @@ class WorkflowStatusResponse(BaseModel): default_factory=list, description="List of steps in the workflow" ) + def is_terminal(self) -> bool: + """Check if the workflow is in a terminal state (completed, aborted, or failed).""" + return self.status.is_terminal() + class ListWorkflowsOptions(BaseModel): """Options for listing workflows.""" @@ -166,7 +190,9 @@ class ListWorkflowsOptions(BaseModel): class ListWorkflowsResponse(BaseModel): """Response from listing workflows.""" - workflows: list[WorkflowStatusResponse] = Field(default_factory=list, description="List of workflows") + workflows: list[WorkflowStatusResponse] = Field( + default_factory=list, description="List of workflows" + ) total: int = Field(default=0, ge=0, description="Total count (for pagination)") From 0acf88710f9817ae3f0823f4beb60c67f5b6a8a5 Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Sat, 17 Jan 2026 13:53:06 +0100 Subject: [PATCH 3/5] fix: resolve remaining lint errors (imports, f-string exceptions) --- axonflow/adapters/langgraph.py | 30 ++++++++++++++++++------------ axonflow/client.py | 1 - 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/axonflow/adapters/langgraph.py b/axonflow/adapters/langgraph.py index ec45e28..6b51edf 100644 --- a/axonflow/adapters/langgraph.py +++ b/axonflow/adapters/langgraph.py @@ -32,9 +32,11 @@ from __future__ import annotations +import asyncio from typing import TYPE_CHECKING, Any from axonflow.workflow import ( + ApprovalStatus, CreateWorkflowRequest, GateDecision, MarkStepCompletedRequest, @@ -194,7 +196,8 @@ async def check_gate( ... result = await generate_code(state) """ if not self.workflow_id: - raise ValueError("Workflow not started. Call start_workflow() first.") + msg = "Workflow not started. Call start_workflow() first." + raise ValueError(msg) # Convert string to StepType if needed if isinstance(step_type, str): @@ -217,8 +220,9 @@ async def check_gate( if response.decision == GateDecision.BLOCK: if self._auto_block: + msg = f"Step '{step_name}' blocked: {response.reason}" raise WorkflowBlockedError( - f"Step '{step_name}' blocked: {response.reason}", + msg, step_id=response.step_id, reason=response.reason, policy_ids=response.policy_ids, @@ -226,8 +230,9 @@ async def check_gate( return False if response.decision == GateDecision.REQUIRE_APPROVAL: + msg = f"Step '{step_name}' requires approval" raise WorkflowApprovalRequiredError( - f"Step '{step_name}' requires approval", + msg, step_id=response.step_id, approval_url=response.approval_url, reason=response.reason, @@ -257,7 +262,8 @@ async def step_completed( >>> await adapter.step_completed("generate", output={"code": result}) """ if not self.workflow_id: - raise ValueError("Workflow not started. Call start_workflow() first.") + msg = "Workflow not started. Call start_workflow() first." + raise ValueError(msg) # Generate step ID if not provided (must match check_gate) if step_id is None: @@ -279,7 +285,8 @@ async def complete_workflow(self) -> None: >>> await adapter.complete_workflow() """ if not self.workflow_id: - raise ValueError("Workflow not started. Call start_workflow() first.") + msg = "Workflow not started. Call start_workflow() first." + raise ValueError(msg) await self.client.complete_workflow(self.workflow_id) @@ -295,7 +302,8 @@ async def abort_workflow(self, reason: str | None = None) -> None: >>> await adapter.abort_workflow("User cancelled the operation") """ if not self.workflow_id: - raise ValueError("Workflow not started. Call start_workflow() first.") + msg = "Workflow not started. Call start_workflow() first." + raise ValueError(msg) await self.client.abort_workflow(self.workflow_id, reason) @@ -321,10 +329,9 @@ async def wait_for_approval( Raises: TimeoutError: If approval not received within timeout """ - import asyncio - if not self.workflow_id: - raise ValueError("Workflow not started. Call start_workflow() first.") + msg = "Workflow not started. Call start_workflow() first." + raise ValueError(msg) elapsed = 0.0 while elapsed < timeout: @@ -334,8 +341,6 @@ async def wait_for_approval( for step in status.steps: if step.step_id == step_id: if step.approval_status: - from axonflow.workflow import ApprovalStatus - if step.approval_status == ApprovalStatus.APPROVED: return True if step.approval_status == ApprovalStatus.REJECTED: @@ -345,7 +350,8 @@ async def wait_for_approval( await asyncio.sleep(poll_interval) elapsed += poll_interval - raise TimeoutError(f"Approval timeout after {timeout}s for step {step_id}") + msg = f"Approval timeout after {timeout}s for step {step_id}" + raise TimeoutError(msg) async def __aenter__(self) -> AxonFlowLangGraphAdapter: """Context manager entry.""" diff --git a/axonflow/client.py b/axonflow/client.py index 90480c8..29b0c2f 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -131,7 +131,6 @@ UsageSummary, ) from axonflow.workflow import ( - AbortWorkflowRequest, ApprovalStatus, CreateWorkflowRequest, CreateWorkflowResponse, From a9528b1e473a4f85835ad7710348bfcd6910213f Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Sat, 17 Jan 2026 13:59:00 +0100 Subject: [PATCH 4/5] fix: resolve mypy type errors with isinstance guards --- axonflow/client.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/axonflow/client.py b/axonflow/client.py index 29b0c2f..269c30e 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -167,7 +167,7 @@ def _parse_datetime(value: str) -> datetime: # Normalize fractional seconds to exactly 6 digits for Python 3.9 compatibility # Handles cases like .35012 (5 digits) -> .350120, or .123456789 (9 digits) -> .123456 - def normalize_fractional_seconds(match: re.Match) -> str: + def normalize_fractional_seconds(match: re.Match[str]) -> str: frac = match.group(1) suffix = match.group(2) # Pad with zeros if less than 6 digits, truncate if more than 6 @@ -245,7 +245,7 @@ def __init__( resolved_endpoint = endpoint or os.environ.get("AXONFLOW_AGENT_URL") if not resolved_endpoint: msg = "endpoint is required (or set AXONFLOW_AGENT_URL environment variable)" - raise ValueError(msg) + raise TypeError(msg) if isinstance(mode, str): mode = Mode(mode) @@ -1355,7 +1355,7 @@ async def get_audit_logs_by_tenant( """ if not tenant_id: msg = "tenant_id is required" - raise ValueError(msg) + raise TypeError(msg) if options is None: options = AuditQueryOptions() @@ -2915,6 +2915,9 @@ async def create_workflow( self._logger.debug("Creating workflow", workflow_name=request.workflow_name) response = await self._orchestrator_request("POST", "/api/v1/workflows", json_data=body) + if not isinstance(response, dict): + msg = "Unexpected response type from workflow creation" + raise TypeError(msg) return CreateWorkflowResponse( workflow_id=response["workflow_id"], @@ -2938,6 +2941,9 @@ async def get_workflow(self, workflow_id: str) -> WorkflowStatusResponse: >>> print(f"Status: {status.status}, Step: {status.current_step_index}") """ response = await self._orchestrator_request("GET", f"/api/v1/workflows/{workflow_id}") + if not isinstance(response, dict): + msg = "Unexpected response type from get workflow" + raise TypeError(msg) return self._map_workflow_response(response) async def step_gate( @@ -2996,6 +3002,9 @@ async def step_gate( f"/api/v1/workflows/{workflow_id}/steps/{step_id}/gate", json_data=body, ) + if not isinstance(response, dict): + msg = "Unexpected response type from step gate" + raise TypeError(msg) return StepGateResponse( decision=GateDecision(response["decision"]), @@ -3142,6 +3151,9 @@ async def list_workflows( path = f"{path}?{'&'.join(params)}" response = await self._orchestrator_request("GET", path) + if not isinstance(response, dict): + msg = "Unexpected response type from list workflows" + raise TypeError(msg) workflows = [self._map_workflow_response(w) for w in response.get("workflows", [])] From abb617ce7e4e7c7e5f930d4085ce3e92158be0df Mon Sep 17 00:00:00 2001 From: Saurabh Jain Date: Sat, 17 Jan 2026 14:01:59 +0100 Subject: [PATCH 5/5] fix: restore ValueError for tenant_id validation (not a type error) --- axonflow/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axonflow/client.py b/axonflow/client.py index 269c30e..265ae4e 100644 --- a/axonflow/client.py +++ b/axonflow/client.py @@ -1355,7 +1355,7 @@ async def get_audit_logs_by_tenant( """ if not tenant_id: msg = "tenant_id is required" - raise TypeError(msg) + raise ValueError(msg) if options is None: options = AuditQueryOptions()