diff --git a/requirements.txt b/requirements.txt index 411458014..cedc51021 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,6 +4,8 @@ pre-commit pytest==7.3.2 pytest-xdist pytest-playwright +dask +distributed browsergym>=0.6.0 joblib>=1.2.0 openai>=1.7,<2 diff --git a/src/agentlab/experiments/graph_execution.py b/src/agentlab/experiments/graph_execution.py new file mode 100644 index 000000000..fe74a1f6a --- /dev/null +++ b/src/agentlab/experiments/graph_execution.py @@ -0,0 +1,96 @@ +from dask import compute, delayed +from browsergym.experiments.loop import ExpArgs +from distributed import LocalCluster, Client + + +def _run(exp_arg: ExpArgs, *dependencies): + return exp_arg.run() + + +def make_dask_client(n_worker): + """Create a Dask client with a LocalCluster backend. + + I struggled to find an appropriate configuration. + I believe it has to do with the interplay of playwright async loop (even if + used in sync mode) and the fact that dask uses asyncio under the hood. + Making sure we use processes and 1 thread per worker seems to work. + + Args: + n_worker: int + Number of workers to create. + + Returns: + A Dask client object. + """ + cluster = LocalCluster( + n_workers=n_worker, + processes=True, + threads_per_worker=1, + ) + + return Client(cluster, asynchronous=True) + + +def execute_task_graph(exp_args_list: list[ExpArgs]): + """Execute a task graph in parallel while respecting dependencies.""" + exp_args_map = {exp_args.exp_id: exp_args for exp_args in exp_args_list} + + tasks = {} + + def get_task(exp_arg: ExpArgs): + if exp_arg.exp_id not in tasks: + dependencies = [get_task(exp_args_map[dep_key]) for dep_key in exp_arg.depends_on] + tasks[exp_arg.exp_id] = delayed(_run)(exp_arg, *dependencies) + return tasks[exp_arg.exp_id] + + for exp_arg in exp_args_list: + get_task(exp_arg) + + task_ids, task_list = zip(*tasks.items()) + results = compute(*task_list) + + return {task_id: result for task_id, result in zip(task_ids, results)} + + +def add_dependencies(exp_args_list: list[ExpArgs], task_dependencies: dict[list] = None): + """Add dependencies to a list of ExpArgs. + + Args: + exp_args_list: list[ExpArgs] + A list of experiments to run. + task_dependencies: dict + A dictionary mapping task names to a list of task names that they + depend on. If None or empty, no dependencies are added. + + Returns: + list[ExpArgs] + The modified exp_args_list with dependencies added. + """ + + if task_dependencies is None or all([len(dep) == 0 for dep in task_dependencies.values()]): + # nothing to be done + return exp_args_list + + exp_args_map = {exp_args.env_args.task_name: exp_args for exp_args in exp_args_list} + if len(exp_args_map) != len(exp_args_list): + raise ValueError( + ( + "Task names are not unique in exp_args_map, " + "you can't run multiple seeds with task dependencies." + ) + ) + + for task_name in exp_args_map.keys(): + if task_name not in task_dependencies: + raise ValueError(f"Task {task_name} is missing from task_dependencies") + + # turn dependencies from task names to exp_ids + for task_name, exp_args in exp_args_map.items(): + + exp_args.depends_on = tuple( + exp_args_map[dep_name].exp_id + for dep_name in task_dependencies[task_name] + if dep_name in exp_args_map # ignore dependencies that are not to be run + ) + + return exp_args_list diff --git a/src/agentlab/experiments/launch_exp.py b/src/agentlab/experiments/launch_exp.py index 0f221f871..77593a59b 100644 --- a/src/agentlab/experiments/launch_exp.py +++ b/src/agentlab/experiments/launch_exp.py @@ -4,7 +4,6 @@ from pathlib import Path from browsergym.experiments.loop import ExpArgs, yield_all_exp_results -from joblib import Parallel, delayed def import_object(path: str): @@ -17,17 +16,45 @@ def import_object(path: str): return obj -def run_experiments(n_jobs, exp_args_list: list[ExpArgs], exp_dir): +def run_experiments(n_jobs, exp_args_list: list[ExpArgs], exp_dir, parallel_backend="joblib"): + """Run a list of ExpArgs in parallel. + + To ensure optimal parallelism, make sure ExpArgs.depend_on is set correctly + and the backend is set to dask. + + Args: + n_jobs: int + Number of parallel jobs. + exp_args_list: list[ExpArgs] + List of ExpArgs objects. + exp_dir: Path + Directory where the experiments will be saved. + parallel_backend: str + Parallel backend to use. Either "joblib", "dask" or "sequential". + + """ logging.info(f"Saving experiments to {exp_dir}") for exp_args in exp_args_list: exp_args.agent_args.prepare() exp_args.prepare(exp_root=exp_dir) - try: - prefer = "processes" - Parallel(n_jobs=n_jobs, prefer=prefer)( - delayed(exp_args.run)() for exp_args in exp_args_list - ) + if parallel_backend == "joblib": + from joblib import Parallel, delayed + + Parallel(n_jobs=n_jobs, prefer="processes")( + delayed(exp_args.run)() for exp_args in exp_args_list + ) + + elif parallel_backend == "dask": + from agentlab.experiments.graph_execution import execute_task_graph, make_dask_client + + with make_dask_client(n_worker=n_jobs): + execute_task_graph(exp_args_list) + elif parallel_backend == "sequential": + for exp_args in exp_args_list: + exp_args.run() + else: + raise ValueError(f"Unknown parallel_backend: {parallel_backend}") finally: # will close servers even if there is an exception or ctrl+c # servers won't be closed if the script is killed with kill -9 or segfaults. @@ -43,17 +70,6 @@ def make_study_dir(exp_root, study_name, add_date=True): return Path(exp_root) / study_name -# def study_agent_on_benchmark(exp_root, study_func, agent, benchmark, extra_kwargs={}): -# exp_args_list = study_func(agent, benchmark, **extra_kwargs) -# study_name = f"{study_func.__name__}_{agent.__class__.__name__}_on_{benchmark}" -# return exp_args_list, make_study_dir(exp_root, study_name) - - -# def make_study(exp_root, study_func, extra_kwargs={}): -# exp_args_list = study_func(**extra_kwargs) -# return exp_args_list, make_study_dir(exp_root, f"{study_func.__name__}") - - def relaunch_study(study_dir: str | Path, relaunch_mode="incomplete_only"): """Return exp_args_list and study_dir @@ -109,112 +125,9 @@ def _yield_incomplete_experiments(exp_root, relaunch_mode="incomplete_only"): raise ValueError(f"Unknown relaunch_mode: {relaunch_mode}") -# def str2dict(arg): -# try: -# return json.loads(arg) -# except json.JSONDecodeError as e: -# raise argparse.ArgumentTypeError(f"Invalid dictionary format: {e}") - - def split_path(path: str): """Split a path into a module name and an object name.""" if "/" in path: path = path.replace("/", ".") module_name, obj_name = path.rsplit(".", 1) return module_name, obj_name - - -# def main(): -# from agentlab.experiments.exp_utils import RESULTS_DIR - -# logging.getLogger().setLevel(logging.INFO) - -# parser = argparse.ArgumentParser() -# parser.add_argument( -# "--exp_root", -# default=RESULTS_DIR, -# help="folder where experiments will be saved", -# ) -# parser.add_argument( -# "--n_jobs", -# default=1, -# type=int, -# help="number of parallel jobs", -# ) -# parser.add_argument( -# "--exp_config", -# type=str, -# default="final_run", -# help="Python path to the experiment function to launch", -# ) -# parser.add_argument( -# "--benchmark", -# type=str, -# default="miniwob", -# choices=["miniwob", "workarena.l1", "workarena.l2", "workarena.l3"], -# help="Benchmark to launch", -# ) -# parser.add_argument( -# "--agent_config", -# type=str, -# default=None, -# help="Python path to the agent config", -# ) -# parser.add_argument( -# "--relaunch_mode", -# default=None, -# type=str, -# choices=[None, "incomplete_only", "all_errors", "server_errors"], -# help="Find all incomplete experiments and relaunch them.", -# ) -# parser.add_argument( -# "--extra_kwargs", -# default="{}", -# type=str2dict, -# help="Extra arguments to pass to the experiment group.", -# ) - -# parser.add_argument( -# "-y", "--auto_accept", action="store_true", help="Skip the prompt to accept the experiment" -# ) - -# parser.add_argument("--shuffle_jobs", action="store_true", help="Shuffle the jobs") - -# args, unknown = parser.parse_known_args() - -# # if relaunch_mode is not None, we will relaunch the experiments -# if args.relaunch_mode is not None: -# assert args.exp_root is not None, "You must specify an exp_root to relaunch experiments." -# exp_args_list, exp_dir = relaunch_study(args.exp_config, args.relaunch_mode) -# else: -# # we launch an experiment using the exp_config -# assert args.exp_config is not None, "You must specify an exp_config." -# study_func = import_object(args.exp_config) -# if args.agent_config is not None: -# agent = import_object(args.agent_config) -# exp_args_list, exp_dir = study_agent_on_benchmark( -# args.exp_root, study_func, agent, args.benchmark, args.extra_kwargs -# ) -# else: -# exp_args_list, exp_dir = make_study(args.exp_root, study_func, args.extra_kwargs) - -# message = f"\nYou are about to launch {len(exp_args_list)} experiments in {exp_dir}.\nPress Y to continue.\n" - -# if args.shuffle_jobs: -# logging.info("Shuffling jobs") -# random.shuffle(exp_args_list) - -# if args.auto_accept: -# logging.info(message) -# answer = "y" -# else: -# answer = input(message) - -# if answer.lower() != "y": -# logging.info("Aborting.") -# else: -# run_experiments(args.n_jobs, exp_args_list, exp_dir) - - -# if __name__ == "__main__": -# main() diff --git a/src/agentlab/llm/llm_utils.py b/src/agentlab/llm/llm_utils.py index 0e68e0823..1a8d8b703 100644 --- a/src/agentlab/llm/llm_utils.py +++ b/src/agentlab/llm/llm_utils.py @@ -7,16 +7,13 @@ import re import time from functools import cache -from pathlib import Path from typing import TYPE_CHECKING from warnings import warn import numpy as np import tiktoken import yaml -from joblib import Memory from langchain.schema import BaseMessage, HumanMessage, SystemMessage -from langchain_openai import ChatOpenAI from openai import BadRequestError, RateLimitError from PIL import Image from transformers import AutoModel, AutoTokenizer @@ -451,22 +448,6 @@ def parse_html_tags(text, keys=(), optional_keys=(), merge_multiple=False): return content_dict, valid, retry_message -class ChatCached: - # I wish I could extend ChatOpenAI, but it is somehow locked, I don't know if it's pydantic soercey. - - def __init__(self, chat, memory=None): - self.chat = chat - self.memory = memory if memory else Memory(location=Path.home() / "llm-cache", verbose=10) - self._call = self.memory.cache(self.chat.__call__, ignore=["self"]) - self._generate = self.memory.cache(self.chat.generate, ignore=["self"]) - - def __call__(self, messages): - return self._call(messages) - - def generate(self, messages): - return self._generate(messages) - - def download_and_save_model(model_name: str, save_dir: str = "."): model = AutoModel.from_pretrained(model_name) model.save_pretrained(save_dir) diff --git a/tests/experiments/test_graph_execution.py b/tests/experiments/test_graph_execution.py new file mode 100644 index 000000000..0522de198 --- /dev/null +++ b/tests/experiments/test_graph_execution.py @@ -0,0 +1,96 @@ +import pytest +from agentlab.experiments.graph_execution import ( + execute_task_graph, + add_dependencies, + make_dask_client, +) +from time import time, sleep +from browsergym.experiments.loop import ExpArgs, EnvArgs + +TASK_TIME = 3 + + +# Mock implementation of the ExpArgs class with timestamp checks +class MockedExpArgs: + def __init__(self, exp_id, depends_on=None): + self.exp_id = exp_id + self.depends_on = depends_on if depends_on else [] + self.start_time = None + self.end_time = None + + def run(self): + self.start_time = time() + + # # simulate playright code, (this was causing issues due to python async loop) + # import playwright.sync_api + + # pw = playwright.sync_api.sync_playwright().start() + # pw.selectors.set_test_id_attribute("mytestid") + sleep(TASK_TIME) # Simulate task execution time + self.end_time = time() + return self + + +def test_execute_task_graph(): + # Define a list of ExpArgs with dependencies + exp_args_list = [ + MockedExpArgs(exp_id="task1", depends_on=[]), + MockedExpArgs(exp_id="task2", depends_on=["task1"]), + MockedExpArgs(exp_id="task3", depends_on=["task1"]), + MockedExpArgs(exp_id="task4", depends_on=["task2", "task3"]), + ] + + with make_dask_client(n_worker=5): + results = execute_task_graph(exp_args_list) + + exp_args_list = [results[task_id] for task_id in ["task1", "task2", "task3", "task4"]] + + # Verify that all tasks were executed in the proper order + assert exp_args_list[0].start_time < exp_args_list[1].start_time + assert exp_args_list[0].start_time < exp_args_list[2].start_time + assert exp_args_list[1].end_time < exp_args_list[3].start_time + assert exp_args_list[2].end_time < exp_args_list[3].start_time + + # Verify that parallel tasks (task2 and task3) started within a short time of each other + parallel_start_diff = abs(exp_args_list[1].start_time - exp_args_list[2].start_time) + assert parallel_start_diff < 1.5 # Allow for a small delay + + # Ensure that the entire task graph took the expected amount of time + total_time = exp_args_list[-1].end_time - exp_args_list[0].start_time + assert ( + total_time >= TASK_TIME * 3 + ) # Since the critical path involves at least 1.5 seconds of work + + +def test_add_dependencies(): + # Prepare a simple list of ExpArgs + + def make_exp_args(task_name, exp_id): + return ExpArgs(agent_args=None, env_args=EnvArgs(task_name=task_name), exp_id=exp_id) + + exp_args_list = [ + make_exp_args("task1", "1"), + make_exp_args("task2", "2"), + make_exp_args("task3", "3"), + ] + + # Define simple task_dependencies + task_dependencies = {"task1": ["task2"], "task2": [], "task3": ["task1"]} + + # Call the function + modified_list = add_dependencies(exp_args_list, task_dependencies) + + # Verify dependencies + assert modified_list[0].depends_on == ("2",) # task1 depends on task2 + assert modified_list[1].depends_on == () # task2 has no dependencies + assert modified_list[2].depends_on == ("1",) # task3 depends on task1 + + # assert raise if task_dependencies is wrong + task_dependencies = {"task1": ["task2"], "task2": [], "task4": ["task3"]} + with pytest.raises(ValueError): + add_dependencies(exp_args_list, task_dependencies) + + +if __name__ == "__main__": + test_execute_task_graph() + # test_add_dependencies() diff --git a/tests/experiments/test_launch_exp.py b/tests/experiments/test_launch_exp.py index 98380506c..b3e1fcd84 100644 --- a/tests/experiments/test_launch_exp.py +++ b/tests/experiments/test_launch_exp.py @@ -25,11 +25,8 @@ def test_relaunch_study(): assert len(exp_args_list) == 2 -if __name__ == "__main__": - test_relaunch_study() - - -def test_launch_system(): +@pytest.mark.repeat(3) # there was stochastic bug caused by asyncio loop not started +def test_launch_system(backend="dask"): exp_args_list = [] for seed in range(3): exp_args_list.append( @@ -45,16 +42,32 @@ def test_launch_system(): with tempfile.TemporaryDirectory() as tmp_dir: study_dir = make_study_dir(tmp_dir, "generic_agent_test") - run_experiments(n_jobs=3, exp_args_list=exp_args_list, exp_dir=study_dir) + run_experiments( + n_jobs=2, exp_args_list=exp_args_list, exp_dir=study_dir, parallel_backend=backend + ) results_df = inspect_results.load_result_df(study_dir, progress_fn=None) assert len(results_df) == len(exp_args_list) + for _, row in results_df.iterrows(): + if row.stack_trace is not None: + print(row.stack_trace) + assert row.err_msg is None + assert row.cum_reward == 1.0 + global_report = inspect_results.global_report(results_df) assert len(global_report) == 2 - assert global_report.avg_reward.iloc[0] == 1.0 assert global_report.std_err.iloc[0] == 0 assert global_report.n_completed.iloc[0] == "3/3" + assert global_report.avg_reward.iloc[0] == 1.0 + + +def test_launch_system_joblib(): + test_launch_system(backend="joblib") + + +def test_launch_system_sequntial(): + test_launch_system(backend="sequential") @pytest.mark.pricy @@ -82,4 +95,6 @@ def test_4o_mini_on_miniwob_tiny_test(): if __name__ == "__main__": - test_4o_mini_on_miniwob_tiny_test() + # test_4o_mini_on_miniwob_tiny_test() + # test_launch_system() + test_launch_system_joblib()