diff --git a/ldp/alg/optimizer/openai_sft_optimizer.py b/ldp/alg/optimizer/openai_sft_optimizer.py deleted file mode 100644 index 00ce6eee..00000000 --- a/ldp/alg/optimizer/openai_sft_optimizer.py +++ /dev/null @@ -1,309 +0,0 @@ -"""This module defines an expert iteration optimizer for black-box OpenAI LLMs. - -The optimizer manages the collation and formatting of training rollout data and initiates fine-tuning jobs through -OpenAI's API: - -https://platform.openai.com/docs/guides/fine-tuning/analyzing-your-fine-tuned-model - -For expert iteration see: - -Havrilla et al. 2024. Teaching large language models to reason with reinforcement learning. -arXiv preprint arXiv:2403.04642. (https://arxiv.org/pdf/2403.04642) - -Example Usage: - - Instantiate the `BlackBoxLLMSFTOpt` with the necessary configuration. - - Accumulate training rollout examples by calling `aggregate_trajectory`. - - Update the model by invoking `update`, which prepares the training data, - uploads it, and triggers the fine-tuning process. -""" - -import json -import logging -import tempfile -import time -from collections.abc import Callable -from typing import Any, Self, cast - -import openai -from pydantic import BaseModel, ConfigDict, Field - -from ldp.agent import ReActAgent -from ldp.alg.optimizer.opt import Optimizer -from ldp.data_structures import Trajectory -from ldp.graph.common_ops import LLMCallOp -from ldp.graph.ops import OpResult - -logger = logging.getLogger(__name__) - - -class OpenAISFTOptConfig(BaseModel): - """Configuration class for the BlackBoxLLMSFTOpt optimizer. - - This class holds various configuration parameters for the optimizer. - """ - - lr: float = 0.001 - num_epochs: int = 1 - buffer_size: int | None = Field( - default=None, - description="Maximum number of finetuning examples to accumulate. " - "If None, the buffer has no size limit.", - ) - val_frac: float = 0.1 - reward_discount: float = 1.0 # Discount factor in [0, 1] for rewards - return_threshold: float | None = Field( - default=None, - description="Minimum return required for a trajectory to be added to the training buffer. If None, " - "all trajectories are added.", - ) - - -class OpenAISFTOpt(BaseModel, Optimizer): - """An optimizer for finetuning black-box LLMs that interact via an API. - - Expert Iteration (SFT) optimizer for fine-tuning black-box OpenAI LLMs. - It handles the aggregation of training data, manages a buffer of training examples, - and initiates fine-tuning jobs via the API. - """ - - model_config = ConfigDict(arbitrary_types_allowed=True) - - # Configuration - config: OpenAISFTOptConfig = Field(default_factory=OpenAISFTOptConfig) - log_to_wandb: bool = False - llm_call_op: LLMCallOp - client: openai.OpenAI = Field(default_factory=openai.OpenAI) - - # State - train_buffer: list = Field(default_factory=list) - val_buffer: list = Field(default_factory=list) - fine_tune_job_id: str | None = None - - train_dataset: list[Any] | None = None - val_dataset: list[Any] | None = None - - def __init__(self, **data): - super().__init__(**data) - - # Validate and populate the training and validation buffers - if self.train_dataset: - self.train_buffer.extend(self.train_dataset) - if self.val_dataset: - self.val_buffer.extend(self.val_dataset) - - @property - def buffer_is_full(self) -> bool: - return ( - self.config.buffer_size is not None - and len(self.train_buffer) >= self.config.buffer_size - ) - - def aggregate_trajectory( - self, - trajectory: Trajectory, - buffer_type: str = "train", - len_penalty_fn: Callable[[int], float] | None = None, - ) -> None: - """Adds training rollout examples from a trajectory to the training buffer. - - This method extracts rollouts and their corresponding discounted returns from a trajectory and stores them - in the appropriate buffer (training or validation) if they meet the return threshold criteria. - - We apply a weight of 1 to actions and a weight of 0 to states. This reflects the fact that we want to train the - agent using P(action | state) as the target distribution. Note that in the OpenAI API, the weight may only be - applied to assistant messages. - - Args: - trajectory: The trajectory containing rollouts and rewards. - buffer_type: The buffer to which the trajectory should be added. Must be either "train" or "validation". - len_penalty_fn: An optional callable that takes an integer (the length of - the list of discounted returns) and returns a scalar penalty to be applied to the discounted return. - - Raises: - RuntimeError: If a rollout in the trajectory does not have an associated compute graph. - ValueError: If the supplied buffer type is invalid. Must be either "train" or "validation". - """ - # Validate buffer type - if buffer_type not in {"train", "validation"}: - raise ValueError('buffer_type must be either "train" or "validation".') - - # Compute the discounted returns - discounted_returns = trajectory.compute_discounted_returns( - self.config.reward_discount - ) - - # Apply the penalty on the length of the trajectory if a penalty function is provided - if len_penalty_fn is not None: - penalty = len_penalty_fn(len(discounted_returns)) - modified_return = discounted_returns[0] * penalty - else: - modified_return = discounted_returns[0] - - # Don't add trajectory to the buffer if it failed or doesn't meet the return threshold - if trajectory.failed or ( - self.config.return_threshold is not None - and modified_return < self.config.return_threshold - ): - return - - traj_msgs = [] - for step in trajectory.steps: - action_call_id = cast(OpResult, step.action).call_id - if action_call_id is None: - raise RuntimeError("Received an action without compute graph attached.") - call_ids = self.llm_call_op.get_call_ids({action_call_id.run_id}) - for call_id in call_ids: - if ( - self.llm_call_op.ctx.get(call_id, "grad_output", default=None) - is None - ): - # This op call was pruned from backward compute graph - skip. - continue - - _, input_kwargs = self.llm_call_op.ctx.get(call_id, "input") - outputs = self.llm_call_op.ctx.get(call_id, "output").value.model_dump() - - # Add "weight": 1 to the outputs dictionary. NB: weight should ONLY be added to assistant messages. All - # output messages are assumed to be assistant messages and will throw an error otherwise. - outputs["weight"] = 1 - - # Just supply list of messages here. Call model_dump on each element of list. Add weight = 0 for input - traj_msgs += [ - { - **msg.model_dump(), - **({"weight": 0} if msg.role == "assistant" else {}), - } - for msg in OpResult.unwrap_value(input_kwargs["msgs"]) - ] - traj_msgs.append(outputs) - - # Choose the appropriate buffer - target_buffer = self.train_buffer if buffer_type == "train" else self.val_buffer - - # Add trajectory to the specified buffer. Buffer is List[List[dict]] - target_buffer.append(traj_msgs) - - # If buffer size is set, ensure that the buffer does not exceed the specified size. If it does exceed the size - # remove the oldest samples. - if ( - self.config.buffer_size is not None - and len(target_buffer) >= self.config.buffer_size - ): - # Calculate the starting index for slicing - start_index = len(target_buffer) - self.config.buffer_size - # Assign the last `buffer_size` elements to `target_buffer` - target_buffer[:] = target_buffer[start_index:] - - async def update(self, check_for_completion: bool = False): - """Updates the model parameters based on the accumulated training data. - - This method processes the accumulated training data by formatting it into the appropriate structure for the - API, uploads it, and then initiates a fine-tuning job. It is important to note that the OpenAI finetuning API - has a minimum requirement of 10 training examples (trajectories) to perform fine-tuning. - - Args: - check_for_completion: A flag to indicate whether to check for the completion of the fine-tuning job. - - Raises: - ValueError: If the training data fails to upload or the fine-tuning job fails to start. - """ - # Prepare the data for fine-tuning in chat format - training_data = [{"messages": traj} for traj in self.train_buffer] - validation_data = ( - [{"messages": traj} for traj in self.val_buffer] - if self.val_buffer - else None - ) - - if not training_data: - return - - def write_to_tempfile(data): - with tempfile.NamedTemporaryFile( - delete=False, suffix=".jsonl" - ) as temp_file: - for example in data: - temp_file.write((json.dumps(example) + "\n").encode("utf-8")) - return temp_file.name - - train_temp_file_path = write_to_tempfile(training_data) - val_temp_file_path = ( - write_to_tempfile(validation_data) if validation_data else None - ) - - try: - with open(train_temp_file_path, "rb") as train_file: - file_id = self.client.files.create( - file=train_file, purpose="fine-tune" - ).id - - val_file_id = None - if val_temp_file_path: - with open(val_temp_file_path, "rb") as val_file: - val_file_id = self.client.files.create( - file=val_file, purpose="fine-tune" - ).id - - fine_tune_job = self.client.fine_tuning.jobs.create( - training_file=file_id, - validation_file=val_file_id, - model="gpt-3.5-turbo", - ) - - self.fine_tune_job_id = fine_tune_job.id - logger.info(f"Fine-tuning job created with ID: {self.fine_tune_job_id}") - - # Check the status of the job periodically until it completes - if check_for_completion: - while True: - job_status = self.client.fine_tuning.jobs.retrieve( - self.fine_tune_job_id - ) - status = job_status.status - - if status == "succeeded": - logger.info("Fine-tuning job succeeded.") - break - if status == "failed": - logger.error( - f"Fine-tuning job failed with status: {job_status}" - ) - raise ValueError( - f"Fine-tuning job failed with status: {job_status}" - ) - logger.info( - f"Fine-tuning job is still running. Current status: {status}" - ) - time.sleep(30) # Wait 30 seconds before checking the status again - - except (openai.APIConnectionError, openai.RateLimitError, openai.APIError) as e: - logger.exception("Error during fine-tuning job creation") - raise ValueError("Failed to create the fine-tuning job.") from e - - def clear_train_buffer(self): - """Clear the training buffer.""" - self.train_buffer.clear() - - def clear_val_buffer(self): - """Clear the validation buffer.""" - self.val_buffer.clear() - - @classmethod - def from_agent(cls, agent: ReActAgent, **kwargs) -> Self: - """Creates an instance of the OpenAISFTOpt class from an existing ReActAgent by extracting. - - the LLM call operation (llm_call_op) from the provided ReActAgent. At the moment, only initialization - from ReActAgent is supported. - - Args: - agent: The ReActAgent from which to extract the LLM call operation. - **kwargs: Additional keyword arguments to pass to the OpenAISFTOpt constructor. - - Returns: - OpenAISFTOpt: An instance of the OpenAISFTOpt class initialized with the LLM call - operation from the provided ReActAgent. - """ - return cls( - llm_call_op=agent._react_module.tool_select_module.llm_call_op, - **kwargs, - ) diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index ef15a715..d7d43f19 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -1,9 +1,6 @@ -from typing import Any - import pytest import tenacity import tree -from aviary.env import DummyEnv from ldp.agent import Agent, MemoryAgent, ReActAgent from ldp.alg.optimizer import ( @@ -12,8 +9,6 @@ default_optimizer_factory, ) from ldp.alg.optimizer.ape import APEOpt, APEScoreFn, Example -from ldp.alg.optimizer.openai_sft_optimizer import OpenAISFTOpt, OpenAISFTOptConfig -from ldp.alg.rollout import RolloutManager from ldp.data_structures import Trajectory, Transition from ldp.graph.common_ops import FxnOp, LLMCallOp, MemoryOp, PromptOp from ldp.graph.gradient_estimators import ( @@ -27,9 +22,6 @@ from ldp.graph.ops import GradInType, Op, OpCtx, OpResult from ldp.llms import LLMModel, append_to_sys -from . import CILLMModelNames -from .conftest import IN_GITHUB_ACTIONS - @pytest.mark.parametrize( ("agent_cls", "optimizer_cls", "optimizer_kwargs"), @@ -155,253 +147,6 @@ async def forward(xi_: str, yi_: str) -> OpResult[int]: raise AssertionError("Failed to complete optimization after retries.") -@pytest.mark.skipif( - IN_GITHUB_ACTIONS, - reason="Flaky test because of the stochasticity of LLM completion", -) -@pytest.mark.parametrize( - ("num_transitions_per_traj", "opt_config"), - [ - (1, {"buffer_size": 10, "return_threshold": 5.0}), - # (10, {"buffer_size": 20, "return_threshold": None}), # Skipping - takes 4+ minutes - ], -) -@pytest.mark.usefixtures("seed_zero") -async def test_openai_sft_optimizer( - num_transitions_per_traj: int, opt_config: dict -) -> None: - prompt_op = PromptOp("Who Are you?") - package_msg_op = FxnOp(append_to_sys) - llm_config = {"model": CILLMModelNames.OPENAI.value} - llm_call_op = LLMCallOp() - - @compute_graph() - async def forward(): - """Perform a forward pass through the model and calculate the loss.""" - s = await prompt_op() - msg = await package_msg_op(s) - return await llm_call_op(llm_config, msg) - - opt = OpenAISFTOpt(llm_call_op=llm_call_op, config=OpenAISFTOptConfig(**opt_config)) - - # Fixed set of rewards for the validation set - fixed_rewards = [6, 5, 7, 9, 3, 6, 8, 4, 1, 10] - - # Build validation set - for _i in range(10): # Generate 10 validation examples - res_list = [await forward() for _ in range(num_transitions_per_traj)] - rewards = fixed_rewards[:num_transitions_per_traj] - for res, _ in zip( - res_list, rewards, strict=False - ): # Ignore the reward variable - res.compute_grads(backward_fns={FxnOp: ste}) - - trajectory = Trajectory( - steps=[ - Transition( - timestep=0, - agent_state=None, - next_agent_state=None, - observation=Transition.NO_OBSERVATION, - next_observation=Transition.NO_OBSERVATION, - action=res, - reward=reward, - done=False, - ) - for res, reward in zip(res_list, rewards, strict=False) - ] - ) - - opt.aggregate_trajectory(trajectory, buffer_type="validation") - - # Build training set - for _i in range(20): # Re-run until buffer is full - res_list = [await forward() for _ in range(num_transitions_per_traj)] - rewards = [10 for _ in range(num_transitions_per_traj)] - for res, _ in zip( - res_list, rewards, strict=False - ): # Ignore the reward variable - res.compute_grads(backward_fns={FxnOp: ste}) - - trajectory = Trajectory( - steps=[ - Transition( - timestep=0, - agent_state=None, - next_agent_state=None, - observation=Transition.NO_OBSERVATION, - next_observation=Transition.NO_OBSERVATION, - action=res, - reward=reward, - done=False, - ) - for res, reward in zip(res_list, rewards, strict=False) - ] - ) - - opt.aggregate_trajectory(trajectory) - - await opt.update() - - # Check that training examples were actually stored in the buffer - assert len(opt.train_buffer) >= 2, "Expected examples to be stored in the buffer." - - with eval_mode(): - for _ in range(5): - res = await forward() - if "I'm" in res.value.content or "I am" in res.value.content: - return - raise AssertionError("Failed to perform expert iteration training") - - -@pytest.mark.asyncio -@pytest.mark.skipif( - IN_GITHUB_ACTIONS, - reason="Flaky test because of the stochasticity of LLM completion", -) -@pytest.mark.usefixtures("seed_zero") -async def test_openai_sft_optimizer_return_threshold() -> None: - prompt_op = PromptOp("Who Are you?") - package_msg_op = FxnOp(append_to_sys) - llm_config = {"model": "gpt-4o-mini"} # Check gpt-4o finetuning. - llm_call_op = LLMCallOp() - - @compute_graph() - async def forward(): - """Perform a forward pass through the model and calculate the loss.""" - s = await prompt_op() - msg = await package_msg_op(s) - return await llm_call_op(llm_config, msg) - - # Set up the optimizer with a reward threshold higher than the test rewards - opt_config = {"buffer_size": 10, "return_threshold": 5.0} - opt = OpenAISFTOpt(llm_call_op=llm_call_op, config=OpenAISFTOptConfig(**opt_config)) - - # Test with rewards lower than the threshold - res_list = [await forward()] - rewards = [3] # Lower than the threshold - for res, _ in zip(res_list, rewards, strict=False): - res.compute_grads(backward_fns={FxnOp: ste}) - - trajectory = Trajectory( - steps=[ - Transition( - timestep=0, - agent_state=None, - next_agent_state=None, - observation=Transition.NO_OBSERVATION, - next_observation=Transition.NO_OBSERVATION, - action=res, - reward=reward, - done=False, - ) - for res, reward in zip(res_list, rewards, strict=False) - ] - ) - - opt.aggregate_trajectory(trajectory) - - # Assert that the train buffer remains empty - assert not opt.train_buffer, "Expected train buffer to be empty." - - -@pytest.mark.asyncio -@pytest.mark.skipif( - IN_GITHUB_ACTIONS, - reason="Flaky test because of the stochasticity of LLM completion", -) -async def test_openai_sft_optimizer_with_tool_calls() -> None: - agent = ReActAgent( - llm_model={"model": CILLMModelNames.OPENAI.value, "temperature": 1.0} - ) - opt = OpenAISFTOpt.from_agent(agent) - rollout = RolloutManager(agent) - - results: list[tuple[Trajectory, Any]] = await rollout.sample_trajectories( - environment_factory=lambda: DummyEnv(end_immediately=False), - max_steps=2, - batch_size=12, - ) - - for traj, _ in results: - if traj.failed: - continue - - assert len(traj.steps) == 2 - traj.steps[0].reward = 0.0 - traj.steps[1].reward = 1.0 - traj.steps[1].truncated = False - traj.steps[1].done = True - - for step in traj.steps: - assert step.action is not None, "Expected step.action to be non-None" - step.action.compute_grads() - - opt.aggregate_trajectory(traj) - - await opt.update() - - -@pytest.mark.asyncio -@pytest.mark.skipif( - IN_GITHUB_ACTIONS, - reason=( - "Flaky test because of the stochasticity of LLM completion; small rate limits" - ), -) -async def test_openai_sft_optimizer_with_length_penalty() -> None: - agent = ReActAgent( - llm_model={"model": CILLMModelNames.OPENAI.value, "temperature": 1.0} - ) - opt_config = { - "buffer_size": 10, - "return_threshold": 5.0, # Set return threshold to 5.0 - } - opt = OpenAISFTOpt.from_agent(agent, config=OpenAISFTOptConfig(**opt_config)) - rollout = RolloutManager(agent) - - # Define a penalty function that penalizes the length of the return list - def length_penalty(length: int) -> float: - return 1 / (1 + length) # Simple penalty based on list length - - # Sample trajectories from the environment - results: list[tuple[Trajectory, Any]] = await rollout.sample_trajectories( - environment_factory=lambda: DummyEnv(end_immediately=False), - max_steps=2, - batch_size=12, - ) - - # Modify the first trajectory to create a short trajectory with a length of 1 - short_trajectory = results[0][0] - short_trajectory.steps = short_trajectory.steps[:1] # Keep only the first step - short_trajectory.steps[0].reward = 12.0 # High reward - short_trajectory.steps[0].done = True - assert ( - short_trajectory.steps[0].action is not None - ), "Expected step.action to be non-None" - short_trajectory.steps[0].action.compute_grads() - - # Apply the penalty function when aggregating the short trajectory - opt.aggregate_trajectory(short_trajectory, len_penalty_fn=length_penalty) - - # Modify the second trajectory to create a long trajectory with a length of 10 - long_trajectory = results[1][0] - long_trajectory.steps *= 5 # Repeat steps to make 10 - for step in long_trajectory.steps: - step.reward = 0.5 # Low reward for each step - step.truncated = False - step.done = False - assert step.action is not None, "Expected step.action to be non-None" - step.action.compute_grads() - long_trajectory.steps[-1].done = True # Mark the last step as done - - # Apply the penalty function when aggregating the long trajectory - opt.aggregate_trajectory(long_trajectory, len_penalty_fn=length_penalty) - - # Verify that the short trajectory is in the buffer and the long one is not - assert len(opt.train_buffer) == 1, "Expected only one trajectory in the buffer." - - def mem_opt_failed(exc: BaseException) -> bool: # Sometimes the memory opt fails to converge because the training examples # are not informative. Try again