From df650ae4484775a3d1a30784233c300342c04dfd Mon Sep 17 00:00:00 2001 From: Ryan-Rhys Griffiths Date: Wed, 11 Sep 2024 11:11:35 -0700 Subject: [PATCH] Removing tests moved to aviary-internal Removing tests related to `OpenAISFTOpt` that have been moved to aviary-internal for the time-being. --- tests/test_optimizer.py | 251 ---------------------------------------- 1 file changed, 251 deletions(-) diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index ef15a715..25661099 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -12,7 +12,6 @@ default_optimizer_factory, ) from ldp.alg.optimizer.ape import APEOpt, APEScoreFn, Example -from ldp.alg.optimizer.openai_sft_optimizer import OpenAISFTOpt, OpenAISFTOptConfig from ldp.alg.rollout import RolloutManager from ldp.data_structures import Trajectory, Transition from ldp.graph.common_ops import FxnOp, LLMCallOp, MemoryOp, PromptOp @@ -27,9 +26,6 @@ from ldp.graph.ops import GradInType, Op, OpCtx, OpResult from ldp.llms import LLMModel, append_to_sys -from . import CILLMModelNames -from .conftest import IN_GITHUB_ACTIONS - @pytest.mark.parametrize( ("agent_cls", "optimizer_cls", "optimizer_kwargs"), @@ -155,253 +151,6 @@ async def forward(xi_: str, yi_: str) -> OpResult[int]: raise AssertionError("Failed to complete optimization after retries.") -@pytest.mark.skipif( - IN_GITHUB_ACTIONS, - reason="Flaky test because of the stochasticity of LLM completion", -) -@pytest.mark.parametrize( - ("num_transitions_per_traj", "opt_config"), - [ - (1, {"buffer_size": 10, "return_threshold": 5.0}), - # (10, {"buffer_size": 20, "return_threshold": None}), # Skipping - takes 4+ minutes - ], -) -@pytest.mark.usefixtures("seed_zero") -async def test_openai_sft_optimizer( - num_transitions_per_traj: int, opt_config: dict -) -> None: - prompt_op = PromptOp("Who Are you?") - package_msg_op = FxnOp(append_to_sys) - llm_config = {"model": CILLMModelNames.OPENAI.value} - llm_call_op = LLMCallOp() - - @compute_graph() - async def forward(): - """Perform a forward pass through the model and calculate the loss.""" - s = await prompt_op() - msg = await package_msg_op(s) - return await llm_call_op(llm_config, msg) - - opt = OpenAISFTOpt(llm_call_op=llm_call_op, config=OpenAISFTOptConfig(**opt_config)) - - # Fixed set of rewards for the validation set - fixed_rewards = [6, 5, 7, 9, 3, 6, 8, 4, 1, 10] - - # Build validation set - for _i in range(10): # Generate 10 validation examples - res_list = [await forward() for _ in range(num_transitions_per_traj)] - rewards = fixed_rewards[:num_transitions_per_traj] - for res, _ in zip( - res_list, rewards, strict=False - ): # Ignore the reward variable - res.compute_grads(backward_fns={FxnOp: ste}) - - trajectory = Trajectory( - steps=[ - Transition( - timestep=0, - agent_state=None, - next_agent_state=None, - observation=Transition.NO_OBSERVATION, - next_observation=Transition.NO_OBSERVATION, - action=res, - reward=reward, - done=False, - ) - for res, reward in zip(res_list, rewards, strict=False) - ] - ) - - opt.aggregate_trajectory(trajectory, buffer_type="validation") - - # Build training set - for _i in range(20): # Re-run until buffer is full - res_list = [await forward() for _ in range(num_transitions_per_traj)] - rewards = [10 for _ in range(num_transitions_per_traj)] - for res, _ in zip( - res_list, rewards, strict=False - ): # Ignore the reward variable - res.compute_grads(backward_fns={FxnOp: ste}) - - trajectory = Trajectory( - steps=[ - Transition( - timestep=0, - agent_state=None, - next_agent_state=None, - observation=Transition.NO_OBSERVATION, - next_observation=Transition.NO_OBSERVATION, - action=res, - reward=reward, - done=False, - ) - for res, reward in zip(res_list, rewards, strict=False) - ] - ) - - opt.aggregate_trajectory(trajectory) - - await opt.update() - - # Check that training examples were actually stored in the buffer - assert len(opt.train_buffer) >= 2, "Expected examples to be stored in the buffer." - - with eval_mode(): - for _ in range(5): - res = await forward() - if "I'm" in res.value.content or "I am" in res.value.content: - return - raise AssertionError("Failed to perform expert iteration training") - - -@pytest.mark.asyncio -@pytest.mark.skipif( - IN_GITHUB_ACTIONS, - reason="Flaky test because of the stochasticity of LLM completion", -) -@pytest.mark.usefixtures("seed_zero") -async def test_openai_sft_optimizer_return_threshold() -> None: - prompt_op = PromptOp("Who Are you?") - package_msg_op = FxnOp(append_to_sys) - llm_config = {"model": "gpt-4o-mini"} # Check gpt-4o finetuning. - llm_call_op = LLMCallOp() - - @compute_graph() - async def forward(): - """Perform a forward pass through the model and calculate the loss.""" - s = await prompt_op() - msg = await package_msg_op(s) - return await llm_call_op(llm_config, msg) - - # Set up the optimizer with a reward threshold higher than the test rewards - opt_config = {"buffer_size": 10, "return_threshold": 5.0} - opt = OpenAISFTOpt(llm_call_op=llm_call_op, config=OpenAISFTOptConfig(**opt_config)) - - # Test with rewards lower than the threshold - res_list = [await forward()] - rewards = [3] # Lower than the threshold - for res, _ in zip(res_list, rewards, strict=False): - res.compute_grads(backward_fns={FxnOp: ste}) - - trajectory = Trajectory( - steps=[ - Transition( - timestep=0, - agent_state=None, - next_agent_state=None, - observation=Transition.NO_OBSERVATION, - next_observation=Transition.NO_OBSERVATION, - action=res, - reward=reward, - done=False, - ) - for res, reward in zip(res_list, rewards, strict=False) - ] - ) - - opt.aggregate_trajectory(trajectory) - - # Assert that the train buffer remains empty - assert not opt.train_buffer, "Expected train buffer to be empty." - - -@pytest.mark.asyncio -@pytest.mark.skipif( - IN_GITHUB_ACTIONS, - reason="Flaky test because of the stochasticity of LLM completion", -) -async def test_openai_sft_optimizer_with_tool_calls() -> None: - agent = ReActAgent( - llm_model={"model": CILLMModelNames.OPENAI.value, "temperature": 1.0} - ) - opt = OpenAISFTOpt.from_agent(agent) - rollout = RolloutManager(agent) - - results: list[tuple[Trajectory, Any]] = await rollout.sample_trajectories( - environment_factory=lambda: DummyEnv(end_immediately=False), - max_steps=2, - batch_size=12, - ) - - for traj, _ in results: - if traj.failed: - continue - - assert len(traj.steps) == 2 - traj.steps[0].reward = 0.0 - traj.steps[1].reward = 1.0 - traj.steps[1].truncated = False - traj.steps[1].done = True - - for step in traj.steps: - assert step.action is not None, "Expected step.action to be non-None" - step.action.compute_grads() - - opt.aggregate_trajectory(traj) - - await opt.update() - - -@pytest.mark.asyncio -@pytest.mark.skipif( - IN_GITHUB_ACTIONS, - reason=( - "Flaky test because of the stochasticity of LLM completion; small rate limits" - ), -) -async def test_openai_sft_optimizer_with_length_penalty() -> None: - agent = ReActAgent( - llm_model={"model": CILLMModelNames.OPENAI.value, "temperature": 1.0} - ) - opt_config = { - "buffer_size": 10, - "return_threshold": 5.0, # Set return threshold to 5.0 - } - opt = OpenAISFTOpt.from_agent(agent, config=OpenAISFTOptConfig(**opt_config)) - rollout = RolloutManager(agent) - - # Define a penalty function that penalizes the length of the return list - def length_penalty(length: int) -> float: - return 1 / (1 + length) # Simple penalty based on list length - - # Sample trajectories from the environment - results: list[tuple[Trajectory, Any]] = await rollout.sample_trajectories( - environment_factory=lambda: DummyEnv(end_immediately=False), - max_steps=2, - batch_size=12, - ) - - # Modify the first trajectory to create a short trajectory with a length of 1 - short_trajectory = results[0][0] - short_trajectory.steps = short_trajectory.steps[:1] # Keep only the first step - short_trajectory.steps[0].reward = 12.0 # High reward - short_trajectory.steps[0].done = True - assert ( - short_trajectory.steps[0].action is not None - ), "Expected step.action to be non-None" - short_trajectory.steps[0].action.compute_grads() - - # Apply the penalty function when aggregating the short trajectory - opt.aggregate_trajectory(short_trajectory, len_penalty_fn=length_penalty) - - # Modify the second trajectory to create a long trajectory with a length of 10 - long_trajectory = results[1][0] - long_trajectory.steps *= 5 # Repeat steps to make 10 - for step in long_trajectory.steps: - step.reward = 0.5 # Low reward for each step - step.truncated = False - step.done = False - assert step.action is not None, "Expected step.action to be non-None" - step.action.compute_grads() - long_trajectory.steps[-1].done = True # Mark the last step as done - - # Apply the penalty function when aggregating the long trajectory - opt.aggregate_trajectory(long_trajectory, len_penalty_fn=length_penalty) - - # Verify that the short trajectory is in the buffer and the long one is not - assert len(opt.train_buffer) == 1, "Expected only one trajectory in the buffer." - - def mem_opt_failed(exc: BaseException) -> bool: # Sometimes the memory opt fails to converge because the training examples # are not informative. Try again