diff --git a/examples/workflows/maker.py b/examples/workflows/maker.py new file mode 100644 index 00000000..a20622cd --- /dev/null +++ b/examples/workflows/maker.py @@ -0,0 +1,156 @@ +""" +MAKER: Massively decomposed Agentic processes with K-voting Error Reduction. + +This example demonstrates the MAKER workflow pattern for achieving high +reliability through statistical consensus voting. + +Based on the paper: + "Solving a Million-Step LLM Task with Zero Errors" + Meyerson et al., 2024 + https://arxiv.org/abs/2511.09030 + +Key Concepts: +------------- +1. **First-to-ahead-by-k Voting**: Multiple samples are drawn from a worker + agent. The first response to achieve a k-vote margin over all alternatives + wins. This provides provable error bounds. + +2. **Red-Flagging**: Responses that show signs of confusion (too long, + malformed) are discarded before voting, improving effective success rate. + +3. **Cost-Effective Reliability**: By trading compute (multiple samples) for + accuracy (statistical consensus), cheap models can achieve high reliability. + +When to Use MAKER: +------------------ +MAKER is designed for **long chains of simple steps** where errors compound: + +Good use cases: +- **ETL pipelines**: 1000s of row transformations - one bad parse = corrupted data +- **Code migration**: 1000s of file changes - one syntax error = build fails +- **Document processing**: 1000s of pages - one missed field = compliance failure +- **Data validation**: Millions of records - one wrong validation = bad data in prod +- **Automated testing**: 1000s of assertions - one false positive = wasted debugging +- **Cost optimization**: Cheap model + voting can replace expensive model + +When NOT to use MAKER: +- Single classifications (just use a good model - 95% accuracy is fine) +- Creative/open-ended tasks (no "correct" answer to vote on) +- Complex reasoning (need smarter model, not more samples) +- Tasks where occasional errors are acceptable + +The Math: +--------- +- 95% per-step accuracy over 100 steps = 0.6% overall success (0.95^100) +- 99.9% per-step accuracy (with MAKER) over 100 steps = 90% overall success +- For million-step tasks, even 99% per-step fails; MAKER enables 99.99%+ + +Demo Use Case: +-------------- +This example shows customer message intent classification. While modern LLMs +are quite consistent on this task (you'll see 3:0 votes), the mechanism +demonstrates how voting works. In production with harder tasks or longer +chains, MAKER's value becomes critical. + +Usage: +------ + uv run examples/workflows/maker.py + +Try modifying k (voting margin) and observe how it affects reliability vs cost. +""" + +import asyncio + +from fast_agent import FastAgent + +fast = FastAgent("MAKER Example") + + +# Define a classifier using a cheap model (Haiku) - may give inconsistent results +# on ambiguous messages, which is why we wrap it with MAKER for reliability +@fast.agent( + name="classifier", + model="claude-3-haiku-20240307", + instruction="""You are a customer support intent classifier. +Classify the customer message into exactly one of: COMPLAINT, QUESTION, REQUEST, FEEDBACK. +Respond with ONLY the single word classification, nothing else. + +Examples: +- "This product is broken!" → COMPLAINT +- "How do I reset my password?" → QUESTION +- "Please cancel my subscription" → REQUEST +- "Just wanted to say I love the new feature" → FEEDBACK""", +) +# Wrap with MAKER for reliable, consistent classification +@fast.maker( + name="reliable_classifier", + worker="classifier", + k=3, # Require 3-vote margin for consensus + max_samples=10, # Max attempts before falling back to plurality + match_strategy="normalized", # Ignore case/whitespace differences + red_flag_max_length=20, # Discard verbose responses (should be one word) +) +async def main(): + """Demonstrate MAKER voting for reliable intent classification.""" + async with fast.run() as agent: + print("=" * 70) + print("MAKER: Massively decomposed Agentic processes") + print(" with K-voting Error Reduction") + print("=" * 70) + print() + print("This example classifies customer messages where intent is ambiguous.") + print("MAKER voting ensures consistent routing even for edge cases.") + print() + + # Ambiguous customer messages where intent is unclear + test_cases = [ + "I've been waiting for 3 days now.", # Complaint? Status question? + "Can someone explain how this works?", # Question? Request for help? + "This isn't what I expected.", # Complaint? Feedback? + "I'd like to speak to a manager.", # Complaint? Request? + "Why does this keep happening?", # Complaint? Question? + "Just wanted to let you know about this.", # Feedback? Complaint? + "Is there any way to get a refund?", # Question? Request? + "The new update changed everything.", # Complaint? Feedback? + ] + + # Collect all results first + results = [] + for text in test_cases: + result = await agent.reliable_classifier.send(text) + stats = agent.reliable_classifier.last_result + results.append((text, result, stats)) + + # Display all results together + print("-" * 70) + print(f"{'Text':<50} {'Result':<10} {'Samples':<8} {'Votes'}") + print("-" * 70) + + for text, result, stats in results: + votes_str = "" + samples = "" + if stats: + votes_str = ", ".join(f"{k}:{v}" for k, v in stats.votes.items()) + samples = str(stats.total_samples) + + print(f"{text:<50} {result:<10} {samples:<8} {votes_str}") + + print("-" * 70) + print() + print("Notice how MAKER provides consistent routing decisions even for") + print("ambiguous messages by voting across multiple samples.") + print() + + # Summary statistics + total_samples = sum(r[2].total_samples for r in results if r[2]) + all_converged = all(r[2].converged for r in results if r[2]) + print("Summary:") + print(f" - Total API calls: {total_samples}") + print(f" - All converged: {all_converged}") + print(f" - Texts classified: {len(results)}") + print() + print("=" * 70) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/fast_agent/agents/agent_types.py b/src/fast_agent/agents/agent_types.py index 9727e376..82bb6627 100644 --- a/src/fast_agent/agents/agent_types.py +++ b/src/fast_agent/agents/agent_types.py @@ -27,6 +27,7 @@ class AgentType(StrEnum): ROUTER = auto() CHAIN = auto() ITERATIVE_PLANNER = auto() + MAKER = auto() @dataclass diff --git a/src/fast_agent/agents/workflow/maker_agent.py b/src/fast_agent/agents/workflow/maker_agent.py new file mode 100644 index 00000000..0adc5425 --- /dev/null +++ b/src/fast_agent/agents/workflow/maker_agent.py @@ -0,0 +1,379 @@ +""" +MAKER: Massively decomposed Agentic processes with K-voting Error Reduction. + +Implementation based on the paper: +"Solving a Million-Step LLM Task with Zero Errors" (arXiv:2511.09030) +https://arxiv.org/abs/2511.09030 + +This workflow implements first-to-ahead-by-k voting for statistical error +correction, enabling high reliability with cost-effective models. The key +insight is that by sampling multiple responses and requiring a k-vote margin +for consensus, the probability of error decreases exponentially while cost +grows only logarithmically with the number of steps. + +Key concepts from the paper: +- Maximal Agentic Decomposition (MAD): Break tasks into single-step subtasks +- First-to-ahead-by-k voting: Winner needs k more votes than runner-up +- Red-flagging: Discard suspicious outputs (too long, malformed) before voting +""" + +from collections import defaultdict +from enum import StrEnum +from typing import Any, Callable, List, Optional, Tuple, Type + +from mcp import Tool +from opentelemetry import trace +from pydantic import BaseModel, Field + +from fast_agent.agents.agent_types import AgentConfig, AgentType +from fast_agent.agents.llm_agent import LlmAgent +from fast_agent.core.exceptions import AgentConfigError +from fast_agent.core.logging.logger import get_logger +from fast_agent.core.prompt import Prompt +from fast_agent.interfaces import AgentProtocol, ModelT +from fast_agent.types import PromptMessageExtended, RequestParams + +logger = get_logger(__name__) + + +class MatchStrategy(StrEnum): + """ + Strategies for comparing responses during voting. + + The choice of strategy affects how responses are grouped for voting: + - EXACT: Responses must match character-for-character + - NORMALIZED: Whitespace and case differences are ignored + - STRUCTURED: JSON responses are parsed and compared structurally + """ + + EXACT = "exact" + NORMALIZED = "normalized" + STRUCTURED = "structured" + + +class MakerResult(BaseModel): + """ + Result of a MAKER voting process. + + Provides transparency into the voting outcome for debugging and analysis. + """ + + winner: str = Field(description="The winning response text") + votes: dict[str, int] = Field( + default_factory=dict, description="Vote counts per unique response" + ) + total_samples: int = Field(default=0, description="Total samples drawn") + discarded_samples: int = Field( + default=0, description="Samples discarded due to red-flags" + ) + margin: int = Field(default=0, description="Winning margin achieved") + converged: bool = Field( + default=False, description="Whether k-margin consensus was achieved" + ) + + +class MakerAgent(LlmAgent): + """ + MAKER: Massively decomposed Agentic processes with K-voting Error Reduction. + + Implements first-to-ahead-by-k voting for statistical error correction. + Multiple samples are drawn from a worker agent, and the first response + to achieve a k-vote margin over all alternatives wins. + + This approach enables: + - High reliability with cheap/small models + - Logarithmic cost scaling with task complexity + - Provable error bounds based on per-step success rate + + Reference: "Solving a Million-Step LLM Task with Zero Errors" + https://arxiv.org/abs/2511.09030 + """ + + @property + def agent_type(self) -> AgentType: + """Return the type of this agent.""" + return AgentType.MAKER + + def __init__( + self, + config: AgentConfig, + worker_agent: AgentProtocol, + k: int = 3, + max_samples: int = 50, + match_strategy: MatchStrategy = MatchStrategy.EXACT, + match_fn: Callable[[str], str] | None = None, + red_flag_max_length: int | None = None, + red_flag_validator: Callable[[str], bool] | None = None, + context: Optional[Any] = None, + **kwargs, + ) -> None: + """ + Initialize the MAKER agent. + + Args: + config: Agent configuration + worker_agent: The agent to sample from for voting + k: Margin required to declare a winner (first-to-ahead-by-k). + Higher k = more reliable but more samples needed. + Paper recommends k >= 3 for high reliability. + max_samples: Maximum samples before falling back to plurality vote + match_strategy: How to compare responses for voting + match_fn: Custom function to normalize responses for comparison. + If provided, overrides match_strategy. + red_flag_max_length: Discard responses longer than this (characters). + Per the paper, overly long responses correlate + with errors. + red_flag_validator: Custom validator function. Return False to + discard the response (red-flag it). + context: Optional context object + """ + super().__init__(config, context=context, **kwargs) + + if not worker_agent: + raise AgentConfigError("Worker agent must be provided") + if k < 1: + raise AgentConfigError("k must be at least 1") + if max_samples < k: + raise AgentConfigError("max_samples must be at least k") + + self.worker_agent = worker_agent + self.k = k + self.max_samples = max_samples + self.match_strategy = match_strategy + self.match_fn = match_fn + self.red_flag_max_length = red_flag_max_length + self.red_flag_validator = red_flag_validator + + # Result tracking + self.last_result: MakerResult | None = None + + def _normalize_response(self, response: str) -> str: + """ + Normalize response for comparison based on configured strategy. + + Args: + response: Raw response text + + Returns: + Normalized response for vote counting + """ + if self.match_fn: + return self.match_fn(response) + + match self.match_strategy: + case MatchStrategy.EXACT: + return response + case MatchStrategy.NORMALIZED: + return " ".join(response.lower().split()) + case MatchStrategy.STRUCTURED: + import json + + try: + parsed = json.loads(response) + return json.dumps(parsed, sort_keys=True) + except json.JSONDecodeError: + return response + + return response + + def _is_red_flagged(self, response: str) -> bool: + """ + Check if response should be discarded (red-flagged). + + Per the MAKER paper, red-flagging improves effective success rate + by discarding responses that show signs of confusion: + - Overly long responses (model went off track) + - Malformed responses (parsing issues indicate confusion) + + Args: + response: Response text to check + + Returns: + True if response should be discarded + """ + if self.red_flag_max_length and len(response) > self.red_flag_max_length: + logger.debug( + f"Red-flagged: response length {len(response)} > {self.red_flag_max_length}" + ) + return True + + if self.red_flag_validator and not self.red_flag_validator(response): + logger.debug("Red-flagged: custom validator returned False") + return True + + return False + + def _check_winner(self, votes: dict[str, int]) -> str | None: + """ + Check if any response has achieved k-margin victory. + + First-to-ahead-by-k: winner needs k more votes than the runner-up. + + Args: + votes: Current vote counts + + Returns: + Winning response key if k-margin achieved, None otherwise + """ + if not votes: + return None + + sorted_items = sorted(votes.items(), key=lambda x: x[1], reverse=True) + leader_key, leader_votes = sorted_items[0] + runner_up_votes = sorted_items[1][1] if len(sorted_items) > 1 else 0 + + if leader_votes - runner_up_votes >= self.k: + return leader_key + + return None + + async def generate_impl( + self, + messages: List[PromptMessageExtended], + request_params: RequestParams | None = None, + tools: List[Tool] | None = None, + ) -> PromptMessageExtended: + """ + Generate a response using first-to-ahead-by-k voting. + + Samples from the worker agent until one response achieves a k-vote + margin over all alternatives, or max_samples is reached. + + Args: + messages: Input messages + request_params: Optional request parameters + tools: Optional tools (passed to worker) + + Returns: + The winning response + """ + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span(f"Maker: '{self._name}' generate"): + votes: dict[str, int] = defaultdict(int) + response_map: dict[str, PromptMessageExtended] = {} + total_samples = 0 + discarded_samples = 0 + + while total_samples < self.max_samples: + async with self.workflow_telemetry.start_step( + "maker.sample", + server_name=self.name, + arguments={ + "agent": self.worker_agent.name, + "sample": total_samples + 1, + "current_votes": dict(votes), + }, + ) as step: + response = await self.worker_agent.generate( + messages, request_params + ) + response_text = response.last_text() or "" + total_samples += 1 + + # Red-flag check + if self._is_red_flagged(response_text): + discarded_samples += 1 + await step.finish( + False, text=f"Sample {total_samples} red-flagged, discarded" + ) + continue + + # Normalize and record vote + normalized = self._normalize_response(response_text) + votes[normalized] += 1 + response_map[normalized] = response + + await step.finish( + True, + text=f"Sample {total_samples}: {votes[normalized]} votes for this response", + ) + + # Check for k-margin winner + winner_key = self._check_winner(votes) + if winner_key: + sorted_votes = sorted(votes.values(), reverse=True) + margin = sorted_votes[0] - ( + sorted_votes[1] if len(sorted_votes) > 1 else 0 + ) + + self.last_result = MakerResult( + winner=winner_key, + votes=dict(votes), + total_samples=total_samples, + discarded_samples=discarded_samples, + margin=margin, + converged=True, + ) + + logger.debug( + f"MAKER converged: {votes[winner_key]} votes, " + f"margin {margin}, {total_samples} samples" + ) + return response_map[winner_key] + + # Max samples reached - fall back to plurality + logger.warning( + f"MAKER: max_samples ({self.max_samples}) reached without " + f"k-margin ({self.k}) consensus, using plurality" + ) + + if not votes: + # All samples were red-flagged + raise AgentConfigError( + f"All {total_samples} samples were red-flagged. " + "Consider relaxing red-flag criteria." + ) + + winner_key = max(votes, key=lambda x: votes[x]) + sorted_votes = sorted(votes.values(), reverse=True) + margin = sorted_votes[0] - ( + sorted_votes[1] if len(sorted_votes) > 1 else 0 + ) + + self.last_result = MakerResult( + winner=winner_key, + votes=dict(votes), + total_samples=total_samples, + discarded_samples=discarded_samples, + margin=margin, + converged=False, + ) + + return response_map[winner_key] + + async def structured_impl( + self, + messages: List[PromptMessageExtended], + model: Type[ModelT], + request_params: RequestParams | None = None, + ) -> Tuple[ModelT | None, PromptMessageExtended]: + """ + Generate a voted response and parse into structured format. + + Args: + messages: Input messages + model: Pydantic model class for structured output + request_params: Optional request parameters + + Returns: + Tuple of (parsed model or None, raw response) + """ + response = await self.generate_impl(messages, request_params) + return await self.worker_agent.structured( + [Prompt.user(response.all_text())], model, request_params + ) + + async def initialize(self) -> None: + """Initialize the agent and its worker agent.""" + await super().initialize() + if not self.worker_agent.initialized: + await self.worker_agent.initialize() + self.initialized = True + + async def shutdown(self) -> None: + """Shutdown the agent and its worker agent.""" + await super().shutdown() + try: + await self.worker_agent.shutdown() + except Exception as e: + logger.warning(f"Error shutting down worker agent: {str(e)}") diff --git a/src/fast_agent/core/direct_decorators.py b/src/fast_agent/core/direct_decorators.py index 6bd3a9f0..e857f182 100644 --- a/src/fast_agent/core/direct_decorators.py +++ b/src/fast_agent/core/direct_decorators.py @@ -81,6 +81,15 @@ class DecoratedEvaluatorOptimizerProtocol(DecoratedAgentProtocol[P, R], Protocol _evaluator: str +# Protocol for maker functions +class DecoratedMakerProtocol(DecoratedAgentProtocol[P, R], Protocol): + """Protocol for decorated MAKER functions with additional metadata.""" + + _worker: str + _k: int + _max_samples: int + + def _fetch_url_content(url: str) -> str: """ Fetch content from a URL. @@ -686,3 +695,78 @@ def evaluator_optimizer( refinement_instruction=refinement_instruction, default=default, ) + + +def maker( + self, + name: str, + *, + worker: str, + k: int = 3, + max_samples: int = 50, + match_strategy: str = "exact", + red_flag_max_length: int | None = None, + instruction: str | Path | AnyUrl | None = None, + default: bool = False, +) -> Callable[[Callable[P, Coroutine[Any, Any, R]]], Callable[P, Coroutine[Any, Any, R]]]: + """ + Decorator to create a MAKER agent for statistical error correction via voting. + + MAKER: Massively decomposed Agentic processes with K-voting Error Reduction. + + Based on the paper "Solving a Million-Step LLM Task with Zero Errors" + (arXiv:2511.09030). Implements first-to-ahead-by-k voting where multiple + samples are drawn from a worker agent, and the first response to achieve + a k-vote margin over alternatives wins. + + This enables high reliability with cost-effective models by trading + compute (multiple samples) for accuracy (statistical consensus). + + Args: + name: Name of the MAKER agent + worker: Name of the agent to sample from for voting + k: Margin required to declare winner (first-to-ahead-by-k). + Higher k = more reliable but more samples needed. + Default of 3 provides strong guarantees for most use cases. + max_samples: Maximum samples before falling back to plurality vote + match_strategy: How to compare responses for voting: + - "exact": Character-for-character match + - "normalized": Ignore whitespace and case differences + - "structured": Parse as JSON and compare structurally + red_flag_max_length: Discard responses longer than this (characters). + Per the paper, overly long responses correlate + with errors. None = no length limit. + instruction: Base instruction for the MAKER agent + default: Whether to mark this as the default agent + + Returns: + A decorator that registers the MAKER agent + + Example: + @fast.agent(name="calculator", instruction="Return only the numeric result") + @fast.maker(name="reliable_calc", worker="calculator", k=3) + async def main(): + async with fast.run() as agent: + result = await agent.reliable_calc.send("What is 17 * 23?") + """ + default_instruction = """ + MAKER: Massively decomposed Agentic processes with K-voting Error Reduction. + Implements statistical error correction through voting consensus. + Multiple samples are drawn and the first response to achieve a k-vote + margin wins, ensuring high reliability even with cost-effective models. + """ + resolved_instruction = _resolve_instruction(instruction or default_instruction) + + return _decorator_impl( + self, + AgentType.MAKER, + name=name, + instruction=resolved_instruction, + servers=[], # MAKER doesn't connect to servers directly + worker=worker, + k=k, + max_samples=max_samples, + match_strategy=match_strategy, + red_flag_max_length=red_flag_max_length, + default=default, + ) diff --git a/src/fast_agent/core/direct_factory.py b/src/fast_agent/core/direct_factory.py index 036799ed..aa0ba093 100644 --- a/src/fast_agent/core/direct_factory.py +++ b/src/fast_agent/core/direct_factory.py @@ -434,6 +434,35 @@ async def create_agents_by_type( await evaluator_optimizer.initialize() result_agents[name] = evaluator_optimizer + elif agent_type == AgentType.MAKER: + # MAKER: Massively decomposed Agentic processes with K-voting Error Reduction + from fast_agent.agents.workflow.maker_agent import MakerAgent, MatchStrategy + + worker_name = agent_data["worker"] + if worker_name not in active_agents: + raise AgentConfigError(f"Worker agent {worker_name} not found") + + worker_agent = active_agents[worker_name] + + # Parse match strategy + match_strategy_str = agent_data.get("match_strategy", "exact") + match_strategy = MatchStrategy(match_strategy_str) + + # Create the MAKER agent + maker_agent = MakerAgent( + config=config, + context=app_instance.context, + worker_agent=worker_agent, + k=agent_data.get("k", 3), + max_samples=agent_data.get("max_samples", 50), + match_strategy=match_strategy, + red_flag_max_length=agent_data.get("red_flag_max_length"), + ) + + # Initialize the agent + await maker_agent.initialize() + result_agents[name] = maker_agent + else: raise ValueError(f"Unknown agent type: {agent_type}") diff --git a/src/fast_agent/core/fastagent.py b/src/fast_agent/core/fastagent.py index 5ac88806..84c2d68f 100644 --- a/src/fast_agent/core/fastagent.py +++ b/src/fast_agent/core/fastagent.py @@ -47,6 +47,9 @@ from fast_agent.core.direct_decorators import ( iterative_planner as orchestrator2_decorator, ) +from fast_agent.core.direct_decorators import ( + maker as maker_decorator, +) from fast_agent.core.direct_decorators import ( orchestrator as orchestrator_decorator, ) @@ -468,6 +471,19 @@ def evaluator_optimizer( default: bool = False, ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: ... + def maker( + self, + name: str, + *, + worker: str, + k: int = 3, + max_samples: int = 50, + match_strategy: str = "exact", + red_flag_max_length: int | None = None, + instruction: str | Path | AnyUrl | None = None, + default: bool = False, + ) -> Callable[[Callable[P, Awaitable[R]]], Callable[P, Awaitable[R]]]: ... + # Runtime bindings (actual implementations) agent = agent_decorator custom = custom_decorator @@ -477,6 +493,7 @@ def evaluator_optimizer( chain = chain_decorator parallel = parallel_decorator evaluator_optimizer = evaluator_optimizer_decorator + maker = maker_decorator def _get_acp_server_class(self): """Import and return the ACP server class with helpful error handling.""" diff --git a/tests/integration/workflow/maker/fastagent.config.yaml b/tests/integration/workflow/maker/fastagent.config.yaml new file mode 100644 index 00000000..2d36b9a3 --- /dev/null +++ b/tests/integration/workflow/maker/fastagent.config.yaml @@ -0,0 +1,2 @@ +mcp: + name: maker_tests diff --git a/tests/integration/workflow/maker/test_maker_agent.py b/tests/integration/workflow/maker/test_maker_agent.py new file mode 100644 index 00000000..c3969416 --- /dev/null +++ b/tests/integration/workflow/maker/test_maker_agent.py @@ -0,0 +1,241 @@ +""" +Integration tests for the MAKER workflow agent. + +MAKER: Massively decomposed Agentic processes with K-voting Error Reduction. + +Based on the paper "Solving a Million-Step LLM Task with Zero Errors" +(arXiv:2511.09030). Tests verify the first-to-ahead-by-k voting mechanism +and red-flag filtering for statistical error correction. +""" + +import pytest + +from fast_agent.core.prompt import Prompt +from fast_agent.llm.internal.passthrough import FIXED_RESPONSE_INDICATOR + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_basic_voting_consensus(fast_agent): + """Test that identical responses achieve immediate k-margin consensus.""" + fast = fast_agent + + @fast.agent(name="worker", model="passthrough") + @fast.maker(name="voter", worker="worker", k=2) + async def agent_function(): + async with fast.run() as agent: + # Prime worker to return consistent responses + consistent_response = f"{FIXED_RESPONSE_INDICATOR}42" + for _ in range(3): + await agent.worker._llm.generate([Prompt.user(consistent_response)]) + + result = await agent.voter.send("What is the answer?") + + assert "42" in result + assert agent.voter.last_result is not None + assert agent.voter.last_result.converged is True + assert agent.voter.last_result.margin >= 2 + + await agent_function() + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_first_to_ahead_by_k(fast_agent): + """Test that voting requires k-margin to declare winner.""" + fast = fast_agent + + @fast.agent(name="worker", model="passthrough") + @fast.maker(name="voter", worker="worker", k=3, max_samples=10) + async def agent_function(): + async with fast.run() as agent: + # Prime responses: A gets 4 votes, B gets 1 vote + # A should win with margin of 3 + responses = [ + f"{FIXED_RESPONSE_INDICATOR}A", + f"{FIXED_RESPONSE_INDICATOR}A", + f"{FIXED_RESPONSE_INDICATOR}B", + f"{FIXED_RESPONSE_INDICATOR}A", + f"{FIXED_RESPONSE_INDICATOR}A", + ] + for resp in responses: + await agent.worker._llm.generate([Prompt.user(resp)]) + + result = await agent.voter.send("Choose A or B") + + assert "A" in result + assert agent.voter.last_result is not None + assert agent.voter.last_result.converged is True + # A has 4 votes, B has 1, margin = 3 + assert agent.voter.last_result.margin >= 3 + + await agent_function() + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_max_samples_fallback(fast_agent): + """Test that plurality is used when max_samples reached without k-margin.""" + fast = fast_agent + + @fast.agent(name="worker", model="passthrough") + @fast.maker(name="voter", worker="worker", k=10, max_samples=10) + async def agent_function(): + async with fast.run() as agent: + # With passthrough, all responses will be the same (last primed) + # Set k=10 so we need 10 identical samples to converge + # But we'll only get 10 samples total, so margin will be 10 (all same) + # This tests that we get a result even at the limit + await agent.worker._llm.generate( + [Prompt.user(f"{FIXED_RESPONSE_INDICATOR}consistent")] + ) + + result = await agent.voter.send("Choose") + + # Should get the consistent response + assert "consistent" in result + assert agent.voter.last_result is not None + assert agent.voter.last_result.total_samples <= 10 + + await agent_function() + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_red_flag_config(fast_agent): + """Test that red_flag_max_length configuration is accepted.""" + fast = fast_agent + + @fast.agent(name="worker", model="passthrough") + @fast.maker(name="voter", worker="worker", k=2, red_flag_max_length=100) + async def agent_function(): + async with fast.run() as agent: + # Prime a short response that won't be red-flagged + short_response = f"{FIXED_RESPONSE_INDICATOR}ok" + await agent.worker._llm.generate([Prompt.user(short_response)]) + + result = await agent.voter.send("Question") + + # Should succeed with short response + assert "ok" in result + assert agent.voter.last_result is not None + assert agent.voter.last_result.discarded_samples == 0 + + await agent_function() + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_match_strategy_normalized(fast_agent): + """Test that normalized matching ignores whitespace and case.""" + fast = fast_agent + + @fast.agent(name="worker", model="passthrough") + @fast.maker(name="voter", worker="worker", k=2, match_strategy="normalized") + async def agent_function(): + async with fast.run() as agent: + # These should all be treated as the same response + responses = [ + f"{FIXED_RESPONSE_INDICATOR}Hello World", + f"{FIXED_RESPONSE_INDICATOR}hello world", + f"{FIXED_RESPONSE_INDICATOR} HELLO WORLD ", + ] + for resp in responses: + await agent.worker._llm.generate([Prompt.user(resp)]) + + await agent.voter.send("Greet") + + # All 3 should count as same vote, achieving k=2 margin immediately + assert agent.voter.last_result is not None + assert agent.voter.last_result.converged is True + # With normalized matching, all 3 responses are identical + assert len(agent.voter.last_result.votes) == 1 + + await agent_function() + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_match_strategy_structured(fast_agent): + """Test that structured matching compares JSON structurally.""" + fast = fast_agent + + @fast.agent(name="worker", model="passthrough") + @fast.maker(name="voter", worker="worker", k=2, match_strategy="structured") + async def agent_function(): + async with fast.run() as agent: + # These JSON objects are structurally identical despite key order + responses = [ + f'{FIXED_RESPONSE_INDICATOR}{{"a": 1, "b": 2}}', + f'{FIXED_RESPONSE_INDICATOR}{{"b": 2, "a": 1}}', + f'{FIXED_RESPONSE_INDICATOR}{{"a": 1, "b": 2}}', + ] + for resp in responses: + await agent.worker._llm.generate([Prompt.user(resp)]) + + await agent.voter.send("Return JSON") + + assert agent.voter.last_result is not None + assert agent.voter.last_result.converged is True + # With structured matching, all 3 responses are identical + assert len(agent.voter.last_result.votes) == 1 + + await agent_function() + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_voting_result_tracking(fast_agent): + """Test that voting results are properly tracked and accessible.""" + fast = fast_agent + + @fast.agent(name="worker", model="passthrough") + @fast.maker(name="voter", worker="worker", k=2, max_samples=10) + async def agent_function(): + async with fast.run() as agent: + # Prime varied responses - alpha appears twice consecutively + # so it will win with k=2 margin after 2 samples + responses = [ + f"{FIXED_RESPONSE_INDICATOR}alpha", + f"{FIXED_RESPONSE_INDICATOR}alpha", + ] + for resp in responses: + await agent.worker._llm.generate([Prompt.user(resp)]) + + await agent.voter.send("Choose") + + # Verify result tracking + result = agent.voter.last_result + assert result is not None + assert "alpha" in result.winner + assert result.total_samples >= 2 + assert result.votes.get("alpha", 0) >= 2 + assert result.margin >= 2 + assert result.converged is True + + await agent_function() + + +@pytest.mark.integration +@pytest.mark.asyncio +async def test_single_response_wins_immediately(fast_agent): + """Test that with k=1, a single unique response wins immediately.""" + fast = fast_agent + + @fast.agent(name="worker", model="passthrough") + @fast.maker(name="voter", worker="worker", k=1) + async def agent_function(): + async with fast.run() as agent: + # Single response should win with k=1 + await agent.worker._llm.generate( + [Prompt.user(f"{FIXED_RESPONSE_INDICATOR}winner")] + ) + + result = await agent.voter.send("Quick test") + + assert "winner" in result + assert agent.voter.last_result is not None + assert agent.voter.last_result.total_samples == 1 + assert agent.voter.last_result.converged is True + + await agent_function()