Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The following agents are availble.
| flux-build | optimized to build containers for the Flux Operator | fractale_agents.kubernetes.FluxBuildAgent |
| result_parse | Parse specific metrics from output logs | fractale_agents.parsers.ResultParserAgent |
| optimize | General optimization agent | fractale_agents.optimize.OptimizeAgent |
| jobspec-transform | Job specification transformation agent | fractale_agents.hpc.job.JobspecTransformAgent |

The general prompt agent is provisioned by fractale directly, `fractale.agents.general.PromptAgent`.
Would you like to see an expert added? Please open an issue and let us know.
Expand Down
112 changes: 112 additions & 0 deletions fractale_agents/agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import json
from typing import Any, Awaitable, Callable, Dict, Optional

import fractale.utils as utils
from fractale.logger import logger


class BaseSubAgent:
"""
Common base for autonomous sub-agents managing internal turn-based loops.
"""

async def execute_loop(
self,
system_prompt: str,
goal: str,
context: str,
max_turns: int = 100,
process_callback: Optional[
Callable[[Dict[str, Any]], Awaitable[Optional[Dict[str, Any]]]]
] = None,
) -> Dict[str, Any]:

from fractale.agents.base import backend

current_prompt = f"{system_prompt}\n\n### USER GOAL\n{goal}\n\n### CONTEXT\n{context}"
turn = 0

while turn < max_turns:
turn += 1
logger.info(f"🧠 [{self.__class__.__name__}] Turn {turn}/{max_turns}")
logger.panel(current_prompt, title="Agent Prompt", color="green")

response_text, tool_calls = backend.generate_response(
prompt=current_prompt,
use_tools=True,
memory=True,
)

# Handle Empty Responses
if not response_text and not tool_calls:
current_prompt = "Your last response was empty. Please provide your next tool call or final response."
continue

# Case 1: Execute Tool Calls
if tool_calls:
current_prompt = ""
for call in tool_calls:
tool_result = await backend.call_tool(call)
safe_content = utils.clean_output(tool_result.content)
current_prompt += f"\nTool '{call['name']}' returned:\n{safe_content}"
continue

# Case 2: Parse JSON
try:
clean_json = utils.extract_code_block(response_text)
if not clean_json:
current_prompt = (
"Please provide your final status/decision in a JSON markdown code block."
)
continue

data = json.loads(clean_json)

# A reason can be provided in data
if "reason" in data and data["reason"]:
logger.panel(
title=f"🧠[{self.__class__.__name__}] Thinking",
message=data["reason"],
color="blue",
)

# Are we processing custom callback?
if process_callback:
# The instruction MUST return back json response:
# {"action": "stop", "instruction": "...."}
instruction = await process_callback(data)

if instruction:
if instruction.get("action") == "stop":
logger.info(
f"🛑 [{self.__class__.__name__}] Force stopped by callback."
)
data.update(
{
"turns_taken": turns,
"reason": "Interrupted by caller",
"goal": goal,
}
)
return data

elif "instruction" in instruction:
# Override the agent's next prompt based on caller feedback
current_prompt = instruction["instruction"]
continue

# Normal class-specific termination checks
if "action" in data or data.get("action") == "stop":
logger.info(f"✅ [{self.__class__.__name__}] Goal reached.")
data["turns_taken"] = turn
return data

except (json.JSONDecodeError, KeyError):
current_prompt = "Your response did not contain valid JSON. Please provide the required JSON structure."

# Limit reached
return {
"status": "limit_reached",
"message": f"Exceeded {max_turns} turns.",
"goal": goal,
}
Empty file added fractale_agents/hpc/__init__.py
Empty file.
1 change: 1 addition & 0 deletions fractale_agents/hpc/job/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .transform import JobspecTransformAgent
141 changes: 141 additions & 0 deletions fractale_agents/hpc/job/transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import json
from typing import Any, Awaitable, Callable, Dict, Optional

import fractale_agents.utils as utils
from fractale_agents.agent import BaseSubAgent
from fractale_agents.logger import logger

transform_prompt = f"""### PERSONA
You are an autonomous build sub-agent with expertise in transforming job specifications.

### REQUIREMENTS & CONSTRAINTS
- You MUST not make up directives that do not exist.
- You MUST preserve as many options as possible from the original.
- If there is a directive that does not translate, you MUST leave it out and add a comment about the performance implications of the omission.
- If you have a tool available, you MUST use it to validate the conversion.
- If you do not have a tool available, you MUST provide a "reason" the script is valid.

### INSTRUCTIONS
1. Analyze the original script provided in the CONTEXT.
2. Write a new script that converts from %s to %s.
3. When you have your finished job specification, return it in a JSON markdown code block with the key "jobspec".
4. If the input script is not a workload manager batch file, you MUST return the JSON "jobspec" value as "noop".
"""


class JobspecTransformAgent(BaseSubAgent):
"""
Agent optimized to transform workload manager job specifications (e.g., Slurm to Flux).
"""

name = "transform"
description = (
"An autonomous expert agent that converts job specifications (batch scripts) "
"between different workload managers (e.g., SLURM to Flux). It can validate "
"conversions and iteratively fix errors based on previous attempts."
)

input_schema = {
"type": "object",
"properties": {
"script": {
"type": "string",
"description": "The original batch script or job specification to convert.",
},
"from_manager": {
"type": "string",
"description": "The name of the source workload manager (e.g., 'slurm').",
},
"to_manager": {
"type": "string",
"description": "The name of the target workload manager (e.g., 'flux').",
},
"fmt": {
"type": "string",
"default": "batch",
"description": "Target format: 'batch' or 'jobspec' (canonical JSON representation).",
},
"error": {
"type": ["string", "null"],
"description": "Error message from a previous failed validation attempt (optional).",
},
"previous_jobspec": {
"type": ["string", "null"],
"description": "The previously generated jobspec that caused the error (optional).",
},
"max_turns": {
"type": "integer",
"default": 100,
"description": "Max turns for the optimization/debugging loop.",
},
},
"required": ["script", "from_manager", "to_manager"],
"annotations": {"fractale.type": "agent"},
}

output_schema = {
"type": "object",
"properties": {
"status": {"type": "string"},
"jobspec": {
"type": "string",
"description": "The transformed job specification, or 'noop' if the input was invalid.",
},
"turns_taken": {"type": "integer"},
"message": {"type": "string"},
},
"required": ["status", "jobspec"],
}

async def __call__(
self,
script: str,
from_manager: str,
to_manager: str,
fmt: str = "batch",
error: Optional[str] = None,
previous_jobspec: Optional[str] = None,
max_turns: int = 100,
process_callback: Optional[
Callable[[Dict[str, Any]], Awaitable[Optional[Dict[str, Any]]]]
] = None,
) -> Dict[str, Any]:
"""
Executes the job specification transformation loop.
"""

# 1. Construct the dynamic Goal
if error is not None:
goal = (
f"You previously attempted to convert a job specification and it did not validate. "
f"Analyze the error and fix it.\n\n### ERROR\n{error}"
)
else:
goal = (
f"Convert the provided job specification from '{from_manager}' to '{to_manager}'. "
f"The desired output format is a '{fmt}' script."
)

# 2. Construct the System Prompt (Persona, Requirements, Instructions)
system_prompt = transform_prompt % (from_manager, to_manager)

# 3. Construct the Context payload
context = f"### ORIGINAL SCRIPT\n{script}\n"

if previous_jobspec is not None:
context += f"\n### PREVIOUS ATTEMPT\n{previous_jobspec}\n"

# 4. Execute the loop inherited from BaseSubAgent
result = await self.execute_loop(
system_prompt=system_prompt,
goal=goal,
context=context,
max_turns=max_turns,
process_callback=process_callback,
)

# Fallback safeguard to ensure output matches schema if the agent didn't provide it
if "jobspec" not in result:
result["jobspec"] = "noop"

return result
Loading