From a38269750bd39e1348602417d07377181bc83d2b Mon Sep 17 00:00:00 2001 From: John F Rucker Date: Sun, 2 Nov 2025 01:25:08 -0700 Subject: [PATCH] Fix workflow engine recipe loading and evidence tracking --- sma-av-streamlit/core/db/models.py | 34 +++++++++++++++++++++++- sma-av-streamlit/core/recipes/service.py | 13 +++++++-- sma-av-streamlit/core/workflow/engine.py | 7 ++++- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/sma-av-streamlit/core/db/models.py b/sma-av-streamlit/core/db/models.py index fec5103..238564b 100644 --- a/sma-av-streamlit/core/db/models.py +++ b/sma-av-streamlit/core/db/models.py @@ -2,7 +2,15 @@ from datetime import datetime from sqlalchemy import ( - Column, Integer, String, Text, Boolean, DateTime, ForeignKey, Index + Column, + Integer, + String, + Text, + Boolean, + DateTime, + ForeignKey, + Index, + JSON, ) from sqlalchemy.orm import declarative_base, relationship @@ -13,6 +21,7 @@ class Agent(Base): id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(255), nullable=False, unique=True, index=True) domain = Column(String(255), nullable=True) + config_json = Column(JSON, nullable=False, default=dict) created_at = Column(DateTime, nullable=False, default=datetime.utcnow) def __repr__(self) -> str: @@ -40,12 +49,34 @@ class Run(Base): agent = relationship("Agent", lazy="joined") recipe = relationship("Recipe", lazy="joined") + evidence = relationship( + "Evidence", + back_populates="run", + cascade="all, delete-orphan", + passive_deletes=True, + order_by="Evidence.id", + ) def __repr__(self) -> str: return f"" Index("ix_runs_status_created", Run.status, Run.created_at.desc()) + +class Evidence(Base): + __tablename__ = "evidence" + id = Column(Integer, primary_key=True, autoincrement=True) + run_id = Column(Integer, ForeignKey("runs.id", ondelete="CASCADE"), nullable=False, index=True) + kind = Column(String(64), nullable=False, default="json") + label = Column(String(255), nullable=True) + payload = Column(JSON, nullable=True) + created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + + run = relationship("Run", back_populates="evidence") + + def __repr__(self) -> str: + return f"" + class Tool(Base): __tablename__ = "tools" id = Column(Integer, primary_key=True, autoincrement=True) @@ -95,4 +126,5 @@ def __repr__(self) -> str: "Tool", "ChatThread", "ChatMessage", + "Evidence", ] diff --git a/sma-av-streamlit/core/recipes/service.py b/sma-av-streamlit/core/recipes/service.py index db96dab..8e6f3c0 100644 --- a/sma-av-streamlit/core/recipes/service.py +++ b/sma-av-streamlit/core/recipes/service.py @@ -7,7 +7,9 @@ import yaml -RECIPES_DIR = Path("recipes") +PACKAGE_ROOT = Path(__file__).resolve().parents[2] +DEFAULT_RECIPES_DIR = PACKAGE_ROOT / "recipes" +RECIPES_DIR = DEFAULT_RECIPES_DIR __all__ = [ "ensure_recipes_dir", @@ -60,6 +62,10 @@ def load_recipe_dict(source: Union[dict, str, Path, Any]) -> dict: p = Path(str(p_val)) if p.exists(): yaml_text = _read_text_from_path(p) + elif not p.is_absolute(): + candidate = DEFAULT_RECIPES_DIR / p + if candidate.exists(): + yaml_text = _read_text_from_path(candidate) if yaml_text is None: y = getattr(source, "yaml", None) if y: @@ -70,7 +76,10 @@ def load_recipe_dict(source: Union[dict, str, Path, Any]) -> dict: p = Path(str(source)) candidate_paths = [p] if not p.is_absolute(): - candidate_paths.append(RECIPES_DIR / p) + candidate_paths.extend([ + RECIPES_DIR / p, + DEFAULT_RECIPES_DIR / p, + ]) for candidate in candidate_paths: if candidate.exists(): yaml_text = _read_text_from_path(candidate) diff --git a/sma-av-streamlit/core/workflow/engine.py b/sma-av-streamlit/core/workflow/engine.py index e6dd328..ef456ae 100644 --- a/sma-av-streamlit/core/workflow/engine.py +++ b/sma-av-streamlit/core/workflow/engine.py @@ -31,6 +31,11 @@ def execute_recipe_run(db: Session, agent_id: int, recipe_id: int) -> Run: db.add(run); db.commit(); db.refresh(run) recipe_dict = load_recipe_dict(recipe.yaml_path) for phase, message in run_workflow_phases(recipe_dict): - attach_json(db, run_id=run.id, payload={"phase": phase, "message": f"{agent.name}: {message}"}) + attach_json( + db, + run_id=run.id, + obj={"phase": phase, "message": f"{agent.name}: {message}"}, + kind="phase", + ) run.status = "completed"; db.commit(); db.refresh(run) return run