Skip to content

Commit 6ea793f

Browse files
committed
Add max_tasks_per_child config
1 parent 9f8b3c4 commit 6ea793f

File tree

2 files changed

+44
-21
lines changed

2 files changed

+44
-21
lines changed

openevolve/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,9 @@ class Config:
367367
convergence_threshold: float = 0.001
368368
early_stopping_metric: str = "combined_score"
369369

370+
# Parallel controller settings
371+
max_tasks_per_child: Optional[int] = None
372+
370373
@classmethod
371374
def from_yaml(cls, path: Union[str, Path]) -> "Config":
372375
"""Load configuration from a YAML file"""

openevolve/process_parallel.py

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
import pickle
99
import signal
1010
import time
11-
from concurrent.futures import ProcessPoolExecutor, Future, TimeoutError as FutureTimeoutError
12-
from dataclasses import dataclass, asdict
11+
from concurrent.futures import Future, ProcessPoolExecutor
12+
from concurrent.futures import TimeoutError as FutureTimeoutError
13+
from dataclasses import asdict, dataclass
1314
from pathlib import Path
1415
from typing import Any, Dict, List, Optional, Tuple
1516

@@ -37,11 +38,11 @@ class SerializableResult:
3738
def _worker_init(config_dict: dict, evaluation_file: str, parent_env: dict = None) -> None:
3839
"""Initialize worker process with necessary components"""
3940
import os
40-
41+
4142
# Set environment from parent process
4243
if parent_env:
4344
os.environ.update(parent_env)
44-
45+
4546
global _worker_config
4647
global _worker_evaluation_file
4748
global _worker_evaluator
@@ -55,8 +56,8 @@ def _worker_init(config_dict: dict, evaluation_file: str, parent_env: dict = Non
5556
DatabaseConfig,
5657
EvaluatorConfig,
5758
LLMConfig,
58-
PromptConfig,
5959
LLMModelConfig,
60+
PromptConfig,
6061
)
6162

6263
# Reconstruct model objects
@@ -125,7 +126,7 @@ def _lazy_init_worker_components():
125126
evaluator_llm,
126127
evaluator_prompt,
127128
database=None, # No shared database in worker
128-
suffix=getattr(_worker_config, 'file_suffix', '.py'),
129+
suffix=getattr(_worker_config, "file_suffix", ".py"),
129130
)
130131

131132

@@ -201,7 +202,7 @@ def _run_iteration_worker(
201202

202203
# Parse response based on evolution mode
203204
if _worker_config.diff_based_evolution:
204-
from openevolve.utils.code_utils import extract_diffs, apply_diff, format_diff_summary
205+
from openevolve.utils.code_utils import apply_diff, extract_diffs, format_diff_summary
205206

206207
diff_blocks = extract_diffs(llm_response)
207208
if not diff_blocks:
@@ -275,7 +276,14 @@ def _run_iteration_worker(
275276
class ProcessParallelController:
276277
"""Controller for process-based parallel evolution"""
277278

278-
def __init__(self, config: Config, evaluation_file: str, database: ProgramDatabase, evolution_tracer=None, file_suffix: str = ".py"):
279+
def __init__(
280+
self,
281+
config: Config,
282+
evaluation_file: str,
283+
database: ProgramDatabase,
284+
evolution_tracer=None,
285+
file_suffix: str = ".py",
286+
):
279287
self.config = config
280288
self.evaluation_file = evaluation_file
281289
self.database = database
@@ -298,7 +306,7 @@ def _serialize_config(self, config: Config) -> dict:
298306

299307
# The asdict() call itself triggers the deepcopy which tries to serialize novelty_llm. Remove it first.
300308
config.database.novelty_llm = None
301-
309+
302310
return {
303311
"llm": {
304312
"models": [asdict(m) for m in config.llm.models],
@@ -334,16 +342,21 @@ def start(self) -> None:
334342

335343
# Pass current environment to worker processes
336344
import os
345+
337346
current_env = dict(os.environ)
338-
347+
339348
# Create process pool with initializer
340349
self.executor = ProcessPoolExecutor(
341350
max_workers=self.num_workers,
342351
initializer=_worker_init,
343352
initargs=(config_dict, self.evaluation_file, current_env),
353+
max_tasks_per_child=self.config.max_tasks_per_child,
344354
)
345355

346-
logger.info(f"Started process pool with {self.num_workers} processes")
356+
logger.info(
357+
f"Started process pool with {self.num_workers} processes "
358+
f"and max {self.config.max_tasks_per_child} tasks per child"
359+
)
347360

348361
def stop(self) -> None:
349362
"""Stop the process pool"""
@@ -426,7 +439,9 @@ async def run_evolution(
426439
completed_iterations = 0
427440

428441
# Island management
429-
programs_per_island = self.config.database.programs_per_island or max(1, max_iterations // (self.config.database.num_islands * 10))
442+
programs_per_island = self.config.database.programs_per_island or max(
443+
1, max_iterations // (self.config.database.num_islands * 10)
444+
)
430445
current_island_counter = 0
431446

432447
# Early stopping tracking
@@ -480,15 +495,19 @@ async def run_evolution(
480495
# Store artifacts
481496
if result.artifacts:
482497
self.database.store_artifacts(child_program.id, result.artifacts)
483-
498+
484499
# Log evolution trace
485500
if self.evolution_tracer:
486501
# Retrieve parent program for trace logging
487-
parent_program = self.database.get(result.parent_id) if result.parent_id else None
502+
parent_program = (
503+
self.database.get(result.parent_id) if result.parent_id else None
504+
)
488505
if parent_program:
489506
# Determine island ID
490-
island_id = child_program.metadata.get("island", self.database.current_island)
491-
507+
island_id = child_program.metadata.get(
508+
"island", self.database.current_island
509+
)
510+
492511
self.evolution_tracer.log_trace(
493512
iteration=completed_iteration,
494513
parent_program=parent_program,
@@ -500,7 +519,7 @@ async def run_evolution(
500519
metadata={
501520
"iteration_time": result.iteration_time,
502521
"changes": child_program.metadata.get("changes", ""),
503-
}
522+
},
504523
)
505524

506525
# Log prompts
@@ -590,8 +609,10 @@ async def run_evolution(
590609

591610
# Check target score
592611
if target_score is not None and child_program.metrics:
593-
if ('combined_score' in child_program.metrics and
594-
child_program.metrics['combined_score'] >= target_score):
612+
if (
613+
"combined_score" in child_program.metrics
614+
and child_program.metrics["combined_score"] >= target_score
615+
):
595616
logger.info(
596617
f"Target score {target_score} reached at iteration {completed_iteration}"
597618
)
@@ -701,8 +722,7 @@ def _submit_iteration(
701722
# Use thread-safe sampling that doesn't modify shared state
702723
# This fixes the race condition from GitHub issue #246
703724
parent, inspirations = self.database.sample_from_island(
704-
island_id=target_island,
705-
num_inspirations=self.config.prompt.num_top_programs
725+
island_id=target_island, num_inspirations=self.config.prompt.num_top_programs
706726
)
707727

708728
# Create database snapshot

0 commit comments

Comments
 (0)