Skip to content

Commit

Permalink
Removing tests moved to aviary-internal
Browse files Browse the repository at this point in the history
Removing tests related to `OpenAISFTOpt` that have been moved to aviary-internal for the time-being.
  • Loading branch information
Ryan-Rhys authored Sep 11, 2024
1 parent c054cda commit df650ae
Showing 1 changed file with 0 additions and 251 deletions.
251 changes: 0 additions & 251 deletions tests/test_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
default_optimizer_factory,
)
from ldp.alg.optimizer.ape import APEOpt, APEScoreFn, Example
from ldp.alg.optimizer.openai_sft_optimizer import OpenAISFTOpt, OpenAISFTOptConfig
from ldp.alg.rollout import RolloutManager
from ldp.data_structures import Trajectory, Transition
from ldp.graph.common_ops import FxnOp, LLMCallOp, MemoryOp, PromptOp
Expand All @@ -27,9 +26,6 @@
from ldp.graph.ops import GradInType, Op, OpCtx, OpResult
from ldp.llms import LLMModel, append_to_sys

from . import CILLMModelNames
from .conftest import IN_GITHUB_ACTIONS


@pytest.mark.parametrize(
("agent_cls", "optimizer_cls", "optimizer_kwargs"),
Expand Down Expand Up @@ -155,253 +151,6 @@ async def forward(xi_: str, yi_: str) -> OpResult[int]:
raise AssertionError("Failed to complete optimization after retries.")


@pytest.mark.skipif(
IN_GITHUB_ACTIONS,
reason="Flaky test because of the stochasticity of LLM completion",
)
@pytest.mark.parametrize(
("num_transitions_per_traj", "opt_config"),
[
(1, {"buffer_size": 10, "return_threshold": 5.0}),
# (10, {"buffer_size": 20, "return_threshold": None}), # Skipping - takes 4+ minutes
],
)
@pytest.mark.usefixtures("seed_zero")
async def test_openai_sft_optimizer(
num_transitions_per_traj: int, opt_config: dict
) -> None:
prompt_op = PromptOp("Who Are you?")
package_msg_op = FxnOp(append_to_sys)
llm_config = {"model": CILLMModelNames.OPENAI.value}
llm_call_op = LLMCallOp()

@compute_graph()
async def forward():
"""Perform a forward pass through the model and calculate the loss."""
s = await prompt_op()
msg = await package_msg_op(s)
return await llm_call_op(llm_config, msg)

opt = OpenAISFTOpt(llm_call_op=llm_call_op, config=OpenAISFTOptConfig(**opt_config))

# Fixed set of rewards for the validation set
fixed_rewards = [6, 5, 7, 9, 3, 6, 8, 4, 1, 10]

# Build validation set
for _i in range(10): # Generate 10 validation examples
res_list = [await forward() for _ in range(num_transitions_per_traj)]
rewards = fixed_rewards[:num_transitions_per_traj]
for res, _ in zip(
res_list, rewards, strict=False
): # Ignore the reward variable
res.compute_grads(backward_fns={FxnOp: ste})

trajectory = Trajectory(
steps=[
Transition(
timestep=0,
agent_state=None,
next_agent_state=None,
observation=Transition.NO_OBSERVATION,
next_observation=Transition.NO_OBSERVATION,
action=res,
reward=reward,
done=False,
)
for res, reward in zip(res_list, rewards, strict=False)
]
)

opt.aggregate_trajectory(trajectory, buffer_type="validation")

# Build training set
for _i in range(20): # Re-run until buffer is full
res_list = [await forward() for _ in range(num_transitions_per_traj)]
rewards = [10 for _ in range(num_transitions_per_traj)]
for res, _ in zip(
res_list, rewards, strict=False
): # Ignore the reward variable
res.compute_grads(backward_fns={FxnOp: ste})

trajectory = Trajectory(
steps=[
Transition(
timestep=0,
agent_state=None,
next_agent_state=None,
observation=Transition.NO_OBSERVATION,
next_observation=Transition.NO_OBSERVATION,
action=res,
reward=reward,
done=False,
)
for res, reward in zip(res_list, rewards, strict=False)
]
)

opt.aggregate_trajectory(trajectory)

await opt.update()

# Check that training examples were actually stored in the buffer
assert len(opt.train_buffer) >= 2, "Expected examples to be stored in the buffer."

with eval_mode():
for _ in range(5):
res = await forward()
if "I'm" in res.value.content or "I am" in res.value.content:
return
raise AssertionError("Failed to perform expert iteration training")


@pytest.mark.asyncio
@pytest.mark.skipif(
IN_GITHUB_ACTIONS,
reason="Flaky test because of the stochasticity of LLM completion",
)
@pytest.mark.usefixtures("seed_zero")
async def test_openai_sft_optimizer_return_threshold() -> None:
prompt_op = PromptOp("Who Are you?")
package_msg_op = FxnOp(append_to_sys)
llm_config = {"model": "gpt-4o-mini"} # Check gpt-4o finetuning.
llm_call_op = LLMCallOp()

@compute_graph()
async def forward():
"""Perform a forward pass through the model and calculate the loss."""
s = await prompt_op()
msg = await package_msg_op(s)
return await llm_call_op(llm_config, msg)

# Set up the optimizer with a reward threshold higher than the test rewards
opt_config = {"buffer_size": 10, "return_threshold": 5.0}
opt = OpenAISFTOpt(llm_call_op=llm_call_op, config=OpenAISFTOptConfig(**opt_config))

# Test with rewards lower than the threshold
res_list = [await forward()]
rewards = [3] # Lower than the threshold
for res, _ in zip(res_list, rewards, strict=False):
res.compute_grads(backward_fns={FxnOp: ste})

trajectory = Trajectory(
steps=[
Transition(
timestep=0,
agent_state=None,
next_agent_state=None,
observation=Transition.NO_OBSERVATION,
next_observation=Transition.NO_OBSERVATION,
action=res,
reward=reward,
done=False,
)
for res, reward in zip(res_list, rewards, strict=False)
]
)

opt.aggregate_trajectory(trajectory)

# Assert that the train buffer remains empty
assert not opt.train_buffer, "Expected train buffer to be empty."


@pytest.mark.asyncio
@pytest.mark.skipif(
IN_GITHUB_ACTIONS,
reason="Flaky test because of the stochasticity of LLM completion",
)
async def test_openai_sft_optimizer_with_tool_calls() -> None:
agent = ReActAgent(
llm_model={"model": CILLMModelNames.OPENAI.value, "temperature": 1.0}
)
opt = OpenAISFTOpt.from_agent(agent)
rollout = RolloutManager(agent)

results: list[tuple[Trajectory, Any]] = await rollout.sample_trajectories(
environment_factory=lambda: DummyEnv(end_immediately=False),
max_steps=2,
batch_size=12,
)

for traj, _ in results:
if traj.failed:
continue

assert len(traj.steps) == 2
traj.steps[0].reward = 0.0
traj.steps[1].reward = 1.0
traj.steps[1].truncated = False
traj.steps[1].done = True

for step in traj.steps:
assert step.action is not None, "Expected step.action to be non-None"
step.action.compute_grads()

opt.aggregate_trajectory(traj)

await opt.update()


@pytest.mark.asyncio
@pytest.mark.skipif(
IN_GITHUB_ACTIONS,
reason=(
"Flaky test because of the stochasticity of LLM completion; small rate limits"
),
)
async def test_openai_sft_optimizer_with_length_penalty() -> None:
agent = ReActAgent(
llm_model={"model": CILLMModelNames.OPENAI.value, "temperature": 1.0}
)
opt_config = {
"buffer_size": 10,
"return_threshold": 5.0, # Set return threshold to 5.0
}
opt = OpenAISFTOpt.from_agent(agent, config=OpenAISFTOptConfig(**opt_config))
rollout = RolloutManager(agent)

# Define a penalty function that penalizes the length of the return list
def length_penalty(length: int) -> float:
return 1 / (1 + length) # Simple penalty based on list length

# Sample trajectories from the environment
results: list[tuple[Trajectory, Any]] = await rollout.sample_trajectories(
environment_factory=lambda: DummyEnv(end_immediately=False),
max_steps=2,
batch_size=12,
)

# Modify the first trajectory to create a short trajectory with a length of 1
short_trajectory = results[0][0]
short_trajectory.steps = short_trajectory.steps[:1] # Keep only the first step
short_trajectory.steps[0].reward = 12.0 # High reward
short_trajectory.steps[0].done = True
assert (
short_trajectory.steps[0].action is not None
), "Expected step.action to be non-None"
short_trajectory.steps[0].action.compute_grads()

# Apply the penalty function when aggregating the short trajectory
opt.aggregate_trajectory(short_trajectory, len_penalty_fn=length_penalty)

# Modify the second trajectory to create a long trajectory with a length of 10
long_trajectory = results[1][0]
long_trajectory.steps *= 5 # Repeat steps to make 10
for step in long_trajectory.steps:
step.reward = 0.5 # Low reward for each step
step.truncated = False
step.done = False
assert step.action is not None, "Expected step.action to be non-None"
step.action.compute_grads()
long_trajectory.steps[-1].done = True # Mark the last step as done

# Apply the penalty function when aggregating the long trajectory
opt.aggregate_trajectory(long_trajectory, len_penalty_fn=length_penalty)

# Verify that the short trajectory is in the buffer and the long one is not
assert len(opt.train_buffer) == 1, "Expected only one trajectory in the buffer."


def mem_opt_failed(exc: BaseException) -> bool:
# Sometimes the memory opt fails to converge because the training examples
# are not informative. Try again
Expand Down

0 comments on commit df650ae

Please sign in to comment.