diff --git a/api/endpoints/validator.py b/api/endpoints/validator.py index 3ca5e29d7..e24c28bda 100644 --- a/api/endpoints/validator.py +++ b/api/endpoints/validator.py @@ -22,7 +22,8 @@ get_all_evaluation_runs_in_evaluation_id, create_evaluation_run_log, check_if_evaluation_run_logs_exist from models.agent import Agent, AgentStatus from models.evaluation import Evaluation, EvaluationStatus -from models.evaluation_run import EvaluationRunStatus, EvaluationRunLogType +from models.evaluation_run import EvaluationRunStatus, EvaluationRunLogType, EvaluationRunErrorCode +from models.evaluation_set import EvaluationSetGroup from models.problem import ProblemTestResult from utils.bittensor import validate_signed_timestamp from utils.s3 import download_text_file_from_s3 @@ -355,7 +356,14 @@ async def validator_request_evaluation( agent_code = await download_text_file_from_s3(f"{agent_id}/agent.py") evaluation_runs = [ValidatorRequestEvaluationResponseEvaluationRun(evaluation_run_id=evaluation_run.evaluation_run_id, problem_name=evaluation_run.problem_name) for evaluation_run in evaluation_runs] - return ValidatorRequestEvaluationResponse(agent_code=agent_code, evaluation_runs=evaluation_runs) + # Determine pass threshold for screeners (None for validators) + pass_threshold = None + if validator.current_agent.status == AgentStatus.screening_1: + pass_threshold = config.SCREENER_1_THRESHOLD + elif validator.current_agent.status == AgentStatus.screening_2: + pass_threshold = config.SCREENER_2_THRESHOLD + + return ValidatorRequestEvaluationResponse(agent_code=agent_code, evaluation_runs=evaluation_runs, pass_threshold=pass_threshold) def record_validator_heartbeat(validator: Validator, system_metrics: SystemMetrics | None = None) -> None: validator.time_last_heartbeat = datetime.now(timezone.utc) @@ -615,6 +623,52 @@ async def validator_disconnect( +# /validator/skip-evaluation-run +# Used to mark an evaluation run as skipped when a screener (validator) cancels the evaluation +@router.post("/skip-evaluation-run") +@handle_validator_http_exceptions +async def validator_skip_evaluation_run( + request: ValidatorSkipEvaluationRunRequest, + validator: Validator = Depends(get_request_validator_with_lock) +) -> ValidatorSkipEvaluationRunResponse: + """Mark an evaluation run as skipped (early termination).""" + + if validator.current_evaluation_id is None: + raise HTTPException( + status_code=409, + detail="This validator is not currently running an evaluation, and therefore cannot skip an evaluation run." + ) + + evaluation_run = await get_evaluation_run_by_id(request.evaluation_run_id) + + if evaluation_run is None: + raise HTTPException( + status_code=404, + detail=f"Evaluation run with ID {request.evaluation_run_id} does not exist." + ) + + if evaluation_run.evaluation_id != validator.current_evaluation_id: + raise HTTPException( + status_code=403, + detail=f"The evaluation run with ID {request.evaluation_run_id} is not associated with the validator's current evaluation." + ) + + # Ensure evaluation is not terminal + if evaluation_run.status in (EvaluationRunStatus.finished, EvaluationRunStatus.error, EvaluationRunStatus.skipped): + logger.info(f"Validator '{validator.name}' skip-evaluation-run called on terminal run (status={evaluation_run.status})") + return ValidatorSkipEvaluationRunResponse() + + evaluation_run.status = EvaluationRunStatus.skipped + evaluation_run.finished_or_errored_at = datetime.now(timezone.utc) + await update_evaluation_run_by_id(evaluation_run) + + logger.info(f"Validator '{validator.name}' skipped an evaluation run") + logger.info(f" Evaluation run ID: {request.evaluation_run_id}") + + return ValidatorSkipEvaluationRunResponse() + + + # /validator/finish-evaluation @router.post("/finish-evaluation") @handle_validator_http_exceptions @@ -630,15 +684,16 @@ async def validator_finish_evaluation( detail="This validator is not currently running an evaluation, and therefore cannot request to finish an evaluation." ) + # Make sure that all evaluation runs have either finished,errored, or skipped # Record a heartbeat for the validator record_validator_heartbeat(validator) # Make sure that all evaluation runs have either finished or errored evaluation_runs = await get_all_evaluation_runs_in_evaluation_id(validator.current_evaluation_id) - if any(evaluation_run.status not in [EvaluationRunStatus.finished, EvaluationRunStatus.error] for evaluation_run in evaluation_runs): + if any(evaluation_run.status not in [EvaluationRunStatus.finished, EvaluationRunStatus.error, EvaluationRunStatus.skipped] for evaluation_run in evaluation_runs): raise HTTPException( status_code=409, - detail="Not all evaluation runs associated with the evaluation that this validator is currently running have either finished or errored. Did you forget to send an update-evaluation-run?" + detail="Not all evaluation runs associated with the evaluation that this validator is currently running have finished, errored, or been skipped. Did you forget to send an update-evaluation-run?" ) @@ -731,4 +786,19 @@ async def handle_evaluation_if_finished(evaluation_id: UUID) -> None: # raise ValueError(f"Invalid agent status: {agent.status}, this should never happen") return - await update_agent_status(hydrated_evaluation.agent_id, new_agent_status) \ No newline at end of file + await update_agent_status(hydrated_evaluation.agent_id, new_agent_status) + + elif hydrated_evaluation.status == EvaluationStatus.failure: + if hydrated_evaluation.evaluation_set_group in (EvaluationSetGroup.screener_1, EvaluationSetGroup.screener_2): + evaluation_runs = await get_all_evaluation_runs_in_evaluation_id(evaluation_id) + has_syntax_penalty = any( + run.error_code is not None and run.error_code == EvaluationRunErrorCode.AGENT_INVALID_PATCH + for run in evaluation_runs + ) + + if has_syntax_penalty: + agent = await get_agent_by_id(hydrated_evaluation.agent_id) + if agent.status == AgentStatus.screening_1: + await update_agent_status(hydrated_evaluation.agent_id, AgentStatus.failed_screening_1) + elif agent.status == AgentStatus.screening_2: + await update_agent_status(hydrated_evaluation.agent_id, AgentStatus.failed_screening_2) diff --git a/api/endpoints/validator_models.py b/api/endpoints/validator_models.py index 2ddda50bb..5a4398c4e 100644 --- a/api/endpoints/validator_models.py +++ b/api/endpoints/validator_models.py @@ -47,6 +47,7 @@ class ValidatorRequestEvaluationResponseEvaluationRun(BaseModel): # :( class ValidatorRequestEvaluationResponse(BaseModel): agent_code: str evaluation_runs: List[ValidatorRequestEvaluationResponseEvaluationRun] + pass_threshold: Optional[float] = None # None for validators @@ -74,6 +75,13 @@ class ValidatorUpdateEvaluationRunRequest(BaseModel): class ValidatorUpdateEvaluationRunResponse(BaseModel): pass +# Models for new endpoint that mark a run as skipped when screener (vali) cancels +class ValidatorSkipEvaluationRunRequest(BaseModel): + evaluation_run_id: UUID + +class ValidatorSkipEvaluationRunResponse(BaseModel): + pass + class ValidatorDisconnectRequest(BaseModel): diff --git a/api/src/backend/postgres_schema.sql b/api/src/backend/postgres_schema.sql index f45d2210a..013f000cb 100644 --- a/api/src/backend/postgres_schema.sql +++ b/api/src/backend/postgres_schema.sql @@ -27,7 +27,8 @@ BEGIN 'initializing_eval', 'running_eval', 'finished', - 'error' + 'error', + 'skipped', ); END IF; @@ -224,12 +225,20 @@ FROM evaluation_runs; -- Second view: Evaluations hydrated view -- Evaluations with aggregated status and average score +-- +-- Status logic: +-- 1. 'failure' — Syntax penalty: screener evaluation where any run hit AGENT_INVALID_PATCH (1040). Agent is penalized. +-- 2. 'success' — Clean completion: every run finished, was skipped, or errored with an agent-level error (1000-1999). +-- The evaluation infra worked; score the agent normally. +-- 3. 'failure' — Infra failure: all runs are done but at least one had a non-agent error (2000+). Re-queue. +-- 4. 'running' — Some runs are still in progress. CREATE OR REPLACE VIEW evaluations_hydrated AS SELECT evaluations.*, (CASE - WHEN EVERY(erh.status = 'finished' OR (erh.status = 'error' AND erh.error_code BETWEEN 1000 AND 1999)) THEN 'success' - WHEN EVERY(erh.status IN ('finished', 'error')) THEN 'failure' + WHEN evaluations.evaluation_set_group IN ('screener_1', 'screener_2') AND bool_or(erh.error_code = 1040) THEN 'failure' + WHEN EVERY(erh.status IN ('finished', 'skipped') OR (erh.status = 'error' AND erh.error_code BETWEEN 1000 AND 1999)) THEN 'success' + WHEN EVERY(erh.status IN ('finished', 'error', 'skipped')) THEN 'failure' ELSE 'running' END)::EvaluationStatus AS status, COUNT(*) FILTER (WHERE erh.solved)::float / COUNT(*) AS score @@ -531,3 +540,14 @@ CREATE TRIGGER tr_refresh_agent_scores_unapproved_agent_ids AFTER INSERT OR UPDATE OR DELETE ON unapproved_agent_ids FOR EACH ROW EXECUTE PROCEDURE refresh_agent_scores(); + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 FROM pg_enum + WHERE enumlabel = 'skipped' + AND enumtypid = (SELECT oid FROM pg_type WHERE typname = 'evaluationrunstatus') + ) THEN + ALTER TYPE EvaluationRunStatus ADD VALUE 'skipped'; + END IF; +END $$; diff --git a/evaluator/problem_suites/polyglot/polyglot_suite.py b/evaluator/problem_suites/polyglot/polyglot_suite.py index bd9db98ac..1c8fa9d85 100644 --- a/evaluator/problem_suites/polyglot/polyglot_suite.py +++ b/evaluator/problem_suites/polyglot/polyglot_suite.py @@ -18,7 +18,7 @@ from utils.git import init_local_repo_with_initial_commit from evaluator.sandbox.sandbox_manager import SandboxManager from evaluator.problem_suites.problem_suite import ProblemSuite, ProblemSuiteName -from utils.diff import get_file_diff, apply_diff_to_local_repo, validate_diff_for_local_repo +from utils.diff import get_file_diff, apply_diff_to_local_repo, validate_diff_for_local_repo, validate_patched_files_syntax @@ -147,7 +147,13 @@ def _on_mount(temp_dir: str): # Apply the patch apply_diff_to_local_repo(patch, sandbox_repo_dir) - + # Syntax-check the patched files + is_valid, error_message = validate_patched_files_syntax(sandbox_repo_dir) + if not is_valid: + raise EvaluationRunException( + EvaluationRunErrorCode.AGENT_INVALID_PATCH, + f"{EvaluationRunErrorCode.AGENT_INVALID_PATCH.get_error_message()}: {error_message}" + ) return sandbox_manager.initialize_sandbox( name=f"eval-sandbox-{problem.name}-{evaluation_run_id}", diff --git a/evaluator/problem_suites/swebench_verified/swebench_verified_suite.py b/evaluator/problem_suites/swebench_verified/swebench_verified_suite.py index 197b50748..358d5a88a 100644 --- a/evaluator/problem_suites/swebench_verified/swebench_verified_suite.py +++ b/evaluator/problem_suites/swebench_verified/swebench_verified_suite.py @@ -11,7 +11,7 @@ from pydantic import BaseModel from utils.docker import get_docker_client from typing import Any, Dict, List, Tuple, Optional -from utils.diff import validate_diff_for_local_repo +from utils.diff import validate_diff_for_local_repo, apply_diff_to_local_repo, validate_patched_files_syntax from evaluator.models import EvaluationRunException from swebench.harness.constants import SWEbenchInstance from utils.temp import create_temp_dir, delete_temp_dir @@ -186,7 +186,14 @@ def initialize_eval_sandbox( f"{EvaluationRunErrorCode.AGENT_INVALID_PATCH.get_error_message()}: {error_message}" ) - + # Syntax-check the patched files + apply_diff_to_local_repo(patch, temp_dir) + is_valid, error_message = validate_patched_files_syntax(temp_dir) + if not is_valid: + raise EvaluationRunException( + EvaluationRunErrorCode.AGENT_INVALID_PATCH, + f"{EvaluationRunErrorCode.AGENT_INVALID_PATCH.get_error_message()}: {error_message}" + ) swebench_instance = problem.userdata diff --git a/evaluator/sandbox/sandbox_manager.py b/evaluator/sandbox/sandbox_manager.py index cbc5ee639..2e641dd87 100644 --- a/evaluator/sandbox/sandbox_manager.py +++ b/evaluator/sandbox/sandbox_manager.py @@ -160,6 +160,20 @@ def initialize_sandbox( + def cleanup_sandbox(self, sandbox: Sandbox): + """Clean up a sandbox's container and temp directory.""" + try: + sandbox.container.stop() + sandbox.container.remove() + except Exception: + pass + try: + delete_temp_dir(sandbox.temp_dir) + except Exception: + pass + + + def run_sandbox( self, sandbox: Sandbox diff --git a/models/evaluation_run.py b/models/evaluation_run.py index 891c28f7c..923408f39 100644 --- a/models/evaluation_run.py +++ b/models/evaluation_run.py @@ -60,6 +60,7 @@ class EvaluationRunStatus(str, Enum): running_eval = 'running_eval' finished = 'finished' error = 'error' + skipped = 'skipped' diff --git a/utils/diff.py b/utils/diff.py index 216d92175..8599437df 100644 --- a/utils/diff.py +++ b/utils/diff.py @@ -1,5 +1,6 @@ """Utilities for computing diffs between files.""" +import ast import os import tempfile import subprocess @@ -8,15 +9,14 @@ from typing import Tuple, Optional - def get_file_diff(old_path, new_path) -> str: """ Gets the diff between two files. - + Args: old_path: The path to the old file new_path: The path to the new file - + Returns: The diff between the two files, expressed as a diff of the old file, as a string. """ @@ -28,13 +28,9 @@ def get_file_diff(old_path, new_path) -> str: missing.append(new_path) if missing: logger.fatal(f"File(s) not found for diff: {', '.join(missing)}") - + # Use diff command - result = subprocess.run( - ["diff", "-u", old_path, new_path], - capture_output=True, - text=True - ) + result = subprocess.run(["diff", "-u", old_path, new_path], capture_output=True, text=True) # Check if the diff was generated successfully # `diff -u` return codes: @@ -53,39 +49,33 @@ def get_file_diff(old_path, new_path) -> str: filename = os.path.basename(old_path) lines[0] = f"--- {filename}" lines[1] = f"+++ {filename}" - - return "\n".join(lines) + return "\n".join(lines) def validate_diff_for_local_repo(diff, local_repo_dir) -> Tuple[bool, Optional[str]]: """ Validates if a diff string is valid and can be applied to a local repository. - + Args: diff: The diff string to validate local_repo_dir: The local repository directory - + Returns: (is_valid: bool, error_message: Optional[str]) """ - + # Write diff to temp file with tempfile.NamedTemporaryFile(mode="w", suffix=".diff", delete=False) as f: f.write(diff) diff_file = f.name - + # Use `git apply --check` to validate without applying - result = subprocess.run( - ["git", "apply", "--check", diff_file], - cwd=local_repo_dir, - capture_output=True, - text=True - ) + result = subprocess.run(["git", "apply", "--check", diff_file], cwd=local_repo_dir, capture_output=True, text=True) # Delete the temp file os.unlink(diff_file) - + # Check if the diff was applied successfully if result.returncode == 0: return True, None @@ -93,11 +83,10 @@ def validate_diff_for_local_repo(diff, local_repo_dir) -> Tuple[bool, Optional[s return False, result.stderr.strip() - def apply_diff_to_local_repo(diff, local_repo_dir) -> None: """ Applies a diff string to files in the source directory. - + Args: diff: The diff string to apply local_repo_dir: The local repository directory @@ -107,18 +96,55 @@ def apply_diff_to_local_repo(diff, local_repo_dir) -> None: with tempfile.NamedTemporaryFile(mode="w", suffix=".diff", delete=False) as f: f.write(diff) diff_file = f.name - + # Use `git apply` to apply the diff - result = subprocess.run( - ["git", "apply", diff_file], - cwd=local_repo_dir, - capture_output=True, - text=True - ) + result = subprocess.run(["git", "apply", diff_file], cwd=local_repo_dir, capture_output=True, text=True) # Delete the temp file os.unlink(diff_file) # Check if the diff was applied successfully if result.returncode != 0: - logger.fatal(f"Failed to apply diff to {local_repo_dir}: {result.stderr.strip()}") \ No newline at end of file + logger.fatal(f"Failed to apply diff to {local_repo_dir}: {result.stderr.strip()}") + + +def validate_patched_files_syntax(repo_dir: str) -> Tuple[bool, Optional[str]]: + """ + After a patch has been applied, check that modified files have valid syntax. + Supports Python (.py) and JavaScript (.js, .mjs) files. + + Args: + repo_dir: The repository directory where the patch was applied + + Returns: + (is_valid: bool, error_message: Optional[str]) + """ + result = subprocess.run(["git", "diff", "--name-only"], cwd=repo_dir, capture_output=True, text=True) + modified_files = [f.strip() for f in result.stdout.strip().splitlines() if f.strip()] + + errors = [] + for filepath in modified_files: + full_path = os.path.join(repo_dir, filepath) + if not os.path.exists(full_path): + continue + + if filepath.endswith(".py"): + try: + with open(full_path, "r") as f: + source = f.read() + ast.parse(source, filename=filepath) + except SyntaxError as e: + errors.append(f"{filepath}:{e.lineno}: {e.msg}") + + elif filepath.endswith((".js", ".mjs")): + with open(full_path, "r") as f: + source = f.read() + result = subprocess.run( + ["node", "--input-type=module", "--check"], input=source, capture_output=True, text=True + ) + if result.returncode != 0: + errors.append(f"{filepath}: {result.stderr.strip()}") + + if errors: + return False, "Patched files have syntax errors:\n" + "\n".join(errors) + return True, None diff --git a/utils/validator_hotkeys.py b/utils/validator_hotkeys.py index a681bd15f..99c168952 100644 --- a/utils/validator_hotkeys.py +++ b/utils/validator_hotkeys.py @@ -17,7 +17,8 @@ {"name": "Alex's Validator (1)", "hotkey": "5HpMvcM593HmizCA3ARLNifxjPSLbN3M5RHYy4GiEqmB3x9n"}, {"name": "Alex's Validator (2)", "hotkey": "5HNpAXVzWaW4yD9UqH5sXFPt1gPFqNTViDy61NdiViyDQiTQ"}, {"name": "Alex's Validator (3)", "hotkey": "5GgqnYQ3QwnCcmxiGatXS3rrHGmkqU3cMSjQFSdLKHDmxyB6"}, - {"name": "Shak's Validator", "hotkey": "5F26aNVC3rZVNbH36DWdZzxPVH17iBNGD14Wtb4nQem742Q7"} + {"name": "Shak's Validator", "hotkey": "5F26aNVC3rZVNbH36DWdZzxPVH17iBNGD14Wtb4nQem742Q7"}, + {"name": "Abe's Validator", "hotkey": "5G699LghHWA18yEPq8NpX9gYi8ZDM3fy2BJvSvYWqtt2DHGE"} ] def is_validator_hotkey_whitelisted(validator_hotkey: str) -> bool: diff --git a/validator/main.py b/validator/main.py index d5f84ab6a..86512d866 100644 --- a/validator/main.py +++ b/validator/main.py @@ -12,6 +12,7 @@ import utils.logger as logger import validator.config as config +from dataclasses import dataclass from typing import Any, Dict from api.endpoints.validator_models import * from models.problem import ProblemTestResultStatus @@ -39,6 +40,13 @@ sandbox_manager = None problem_suites = [] +# Result from a single evaluation run, +# Used to decide whether to cancel remaining runs +@dataclass +class RunOutcome: + solved: bool = False # Did all tests pass? + had_syntax_error: bool = False # Was there an AGENT_INVALID_PATCH error? + # Disconnect from the Ridges platform (called when the program exits) @@ -99,6 +107,13 @@ async def update_evaluation_run(evaluation_run_id: UUID, problem_name: str, upda ), bearer_token=session_id, quiet=2) +async def skip_evaluation_run(evaluation_run_id: UUID, problem_name: str): + logger.info(f"Skipping evaluation run {evaluation_run_id} for problem {problem_name} (early termination)...") + + await post_ridges_platform("/validator/skip-evaluation-run", ValidatorSkipEvaluationRunRequest( + evaluation_run_id=evaluation_run_id + ), bearer_token=session_id, quiet=2) + # Truncates a log if required def truncate_logs_if_required(log: str) -> str: @@ -113,7 +128,7 @@ async def _simulate_run_evaluation_run_with_semaphore(evaluation_run_id: UUID, p return await _simulate_run_evaluation_run(evaluation_run_id, problem_name) # Simulate a run of an evaluation run, useful for testing, set SIMULATE_EVALUATION_RUNS=True in .env -async def _simulate_run_evaluation_run(evaluation_run_id: UUID, problem_name: str): +async def _simulate_run_evaluation_run(evaluation_run_id: UUID, problem_name: str) -> RunOutcome: logger.info(f"Starting simulated evaluation run {evaluation_run_id} for problem {problem_name}...") @@ -147,6 +162,7 @@ async def _simulate_run_evaluation_run(evaluation_run_id: UUID, problem_name: st logger.info(f"Finished simulated evaluation run {evaluation_run_id} for problem {problem_name}") + return RunOutcome(solved=True) async def _run_evaluation_run_with_semaphore(evaluation_run_id: UUID, problem_name: str, agent_code: str, semaphore: asyncio.Semaphore): @@ -154,7 +170,7 @@ async def _run_evaluation_run_with_semaphore(evaluation_run_id: UUID, problem_na return await _run_evaluation_run(evaluation_run_id, problem_name, agent_code) # Run an evaluation run -async def _run_evaluation_run(evaluation_run_id: UUID, problem_name: str, agent_code: str): +async def _run_evaluation_run(evaluation_run_id: UUID, problem_name: str, agent_code: str) -> RunOutcome: try: # Figure out what problem suite this problem belongs to problem_suite = next((suite for suite in problem_suites if suite.has_problem_name(problem_name)), None) @@ -165,7 +181,7 @@ async def _run_evaluation_run(evaluation_run_id: UUID, problem_name: str, agent_ "error_code": EvaluationRunErrorCode.VALIDATOR_UNKNOWN_PROBLEM.value, "error_message": f"The problem '{problem_name}' was not found in any problem suite" }) - return + return RunOutcome() # Get the problem problem = problem_suite.get_problem(problem_name) @@ -174,7 +190,9 @@ async def _run_evaluation_run(evaluation_run_id: UUID, problem_name: str, agent_ logger.info(f"Starting evaluation run {evaluation_run_id} for problem {problem_name}...") - + outcome = RunOutcome() + agent_sandbox = None + eval_sandbox = None try: # Move from pending -> initializing_agent @@ -238,6 +256,19 @@ async def _run_evaluation_run(evaluation_run_id: UUID, problem_name: str, agent_ "eval_logs": truncate_logs_if_required(eval_logs) }) + outcome.solved = num_passed == len(test_results) and len(test_results) > 0 + + except asyncio.CancelledError: + logger.info(f"Evaluation run {evaluation_run_id} for problem {problem_name} cancelled; cleaning up sandboxes") + + if sandbox_manager is not None: + if agent_sandbox is not None: + await asyncio.shield(asyncio.to_thread(sandbox_manager.cleanup_sandbox, agent_sandbox)) + if eval_sandbox is not None: + await asyncio.shield(asyncio.to_thread(sandbox_manager.cleanup_sandbox, eval_sandbox)) + + raise + except EvaluationRunException as e: logger.error(f"Evaluation run {evaluation_run_id} for problem {problem_name} errored: {e}") @@ -246,6 +277,8 @@ async def _run_evaluation_run(evaluation_run_id: UUID, problem_name: str, agent_ "error_message": e.error_message }) + outcome.had_syntax_error = (e.error_code == EvaluationRunErrorCode.AGENT_INVALID_PATCH) + except Exception as e: logger.error(f"Evaluation run {evaluation_run_id} for problem {problem_name} errored: {EvaluationRunErrorCode.VALIDATOR_INTERNAL_ERROR.get_error_message()}: {e}") logger.error(traceback.format_exc()) @@ -258,15 +291,23 @@ async def _run_evaluation_run(evaluation_run_id: UUID, problem_name: str, agent_ logger.info(f"Finished evaluation run {evaluation_run_id} for problem {problem_name}") + return outcome + except asyncio.CancelledError: + raise except Exception as e: logger.error(f"Error in _run_evaluation_run(): {type(e).__name__}: {e}") logger.error(traceback.format_exc()) os._exit(1) + + return RunOutcome() # Run an evaluation, automatically dispatches all runs to either _simulate_run_evaluation_run or _run_evaluation_run +# Terminate early when: +# - Threshold is impossible +# - Syntax error is detected async def _run_evaluation(request_evaluation_response: ValidatorRequestEvaluationResponse): logger.info("Received evaluation:") logger.info(f" # of evaluation runs: {len(request_evaluation_response.evaluation_runs)}") @@ -274,24 +315,76 @@ async def _run_evaluation(request_evaluation_response: ValidatorRequestEvaluatio for evaluation_run in request_evaluation_response.evaluation_runs: logger.info(f" {evaluation_run.problem_name}") - + pass_threshold = request_evaluation_response.pass_threshold + logger.info(f" Pass threshold: {pass_threshold}") logger.info("Starting evaluation...") - tasks = [] + task_to_run_info: Dict[asyncio.Task, ValidatorRequestEvaluationResponseEvaluationRun] = {} + semaphore = asyncio.Semaphore(config.MAX_CONCURRENT_EVALUATION_RUNS) for evaluation_run in request_evaluation_response.evaluation_runs: evaluation_run_id = evaluation_run.evaluation_run_id problem_name = evaluation_run.problem_name if config.SIMULATE_EVALUATION_RUNS: - tasks.append(asyncio.create_task(_simulate_run_evaluation_run_with_semaphore(evaluation_run_id, problem_name, semaphore))) + task = asyncio.create_task(_simulate_run_evaluation_run_with_semaphore(evaluation_run_id, problem_name, semaphore)) else: - tasks.append(asyncio.create_task(_run_evaluation_run_with_semaphore(evaluation_run_id, problem_name, request_evaluation_response.agent_code, semaphore))) + task = asyncio.create_task(_run_evaluation_run_with_semaphore(evaluation_run_id, problem_name, request_evaluation_response.agent_code, semaphore)) + + task_to_run_info[task] = evaluation_run + + # Process tasks as they complete (check for early termination) + total = len(task_to_run_info) + solved_count = 0 + completed_count = 0 + pending = set(task_to_run_info.keys()) + skip_reason = None + + while pending: + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + + for task in done: + try: + outcome: RunOutcome = task.result() + completed_count += 1 + if outcome.solved: + solved_count += 1 + + # Only screeners have non-zero pass threshold + if pass_threshold is not None and skip_reason is None: + remaining = total - completed_count + + if outcome.had_syntax_error: + skip_reason = "syntax error penalty" + logger.info(f"Early termination triggered - syntax error detected, skipping remaining {len(pending)} runs") + + elif remaining > 0 and (solved_count + remaining) / total < pass_threshold: + skip_reason = "threshold impossible" + logger.info(f"Early termination triggered - threshold impossible ({solved_count + remaining}/{total} < {pass_threshold}), skipping remaining {len(pending)} runs") + + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"Error getting result from task: {e}") + completed_count += 1 + + if skip_reason is not None and pending: + for pending_task in pending: + pending_task.cancel() + + await asyncio.wait(pending) + + for pending_task in pending: + run_info = task_to_run_info[pending_task] + try: + await skip_evaluation_run(run_info.evaluation_run_id, run_info.problem_name) + except Exception as e: + logger.error(f"Error skipping evaluation run {run_info.evaluation_run_id}: {e}") - await asyncio.gather(*tasks) + pending = set() - logger.info("Finished evaluation") + logger.info(f"Finished evaluation (solved={solved_count}/{total}, skip_reason={skip_reason})") await post_ridges_platform("/validator/finish-evaluation", ValidatorFinishEvaluationRequest(), bearer_token=session_id, quiet=1)