Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removing OpenAISFTOpt #24

Merged
merged 4 commits into from
Sep 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 0 additions & 251 deletions tests/test_optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
default_optimizer_factory,
)
from ldp.alg.optimizer.ape import APEOpt, APEScoreFn, Example
from ldp.alg.optimizer.openai_sft_optimizer import OpenAISFTOpt, OpenAISFTOptConfig
from ldp.alg.rollout import RolloutManager
from ldp.data_structures import Trajectory, Transition
from ldp.graph.common_ops import FxnOp, LLMCallOp, MemoryOp, PromptOp
Expand All @@ -27,9 +26,6 @@
from ldp.graph.ops import GradInType, Op, OpCtx, OpResult
from ldp.llms import LLMModel, append_to_sys

from . import CILLMModelNames
from .conftest import IN_GITHUB_ACTIONS


@pytest.mark.parametrize(
("agent_cls", "optimizer_cls", "optimizer_kwargs"),
Expand Down Expand Up @@ -155,253 +151,6 @@ async def forward(xi_: str, yi_: str) -> OpResult[int]:
raise AssertionError("Failed to complete optimization after retries.")


@pytest.mark.skipif(
IN_GITHUB_ACTIONS,
reason="Flaky test because of the stochasticity of LLM completion",
)
@pytest.mark.parametrize(
("num_transitions_per_traj", "opt_config"),
[
(1, {"buffer_size": 10, "return_threshold": 5.0}),
# (10, {"buffer_size": 20, "return_threshold": None}), # Skipping - takes 4+ minutes
],
)
@pytest.mark.usefixtures("seed_zero")
async def test_openai_sft_optimizer(
num_transitions_per_traj: int, opt_config: dict
) -> None:
prompt_op = PromptOp("Who Are you?")
package_msg_op = FxnOp(append_to_sys)
llm_config = {"model": CILLMModelNames.OPENAI.value}
llm_call_op = LLMCallOp()

@compute_graph()
async def forward():
"""Perform a forward pass through the model and calculate the loss."""
s = await prompt_op()
msg = await package_msg_op(s)
return await llm_call_op(llm_config, msg)

opt = OpenAISFTOpt(llm_call_op=llm_call_op, config=OpenAISFTOptConfig(**opt_config))

# Fixed set of rewards for the validation set
fixed_rewards = [6, 5, 7, 9, 3, 6, 8, 4, 1, 10]

# Build validation set
for _i in range(10): # Generate 10 validation examples
res_list = [await forward() for _ in range(num_transitions_per_traj)]
rewards = fixed_rewards[:num_transitions_per_traj]
for res, _ in zip(
res_list, rewards, strict=False
): # Ignore the reward variable
res.compute_grads(backward_fns={FxnOp: ste})

trajectory = Trajectory(
steps=[
Transition(
timestep=0,
agent_state=None,
next_agent_state=None,
observation=Transition.NO_OBSERVATION,
next_observation=Transition.NO_OBSERVATION,
action=res,
reward=reward,
done=False,
)
for res, reward in zip(res_list, rewards, strict=False)
]
)

opt.aggregate_trajectory(trajectory, buffer_type="validation")

# Build training set
for _i in range(20): # Re-run until buffer is full
res_list = [await forward() for _ in range(num_transitions_per_traj)]
rewards = [10 for _ in range(num_transitions_per_traj)]
for res, _ in zip(
res_list, rewards, strict=False
): # Ignore the reward variable
res.compute_grads(backward_fns={FxnOp: ste})

trajectory = Trajectory(
steps=[
Transition(
timestep=0,
agent_state=None,
next_agent_state=None,
observation=Transition.NO_OBSERVATION,
next_observation=Transition.NO_OBSERVATION,
action=res,
reward=reward,
done=False,
)
for res, reward in zip(res_list, rewards, strict=False)
]
)

opt.aggregate_trajectory(trajectory)

await opt.update()

# Check that training examples were actually stored in the buffer
assert len(opt.train_buffer) >= 2, "Expected examples to be stored in the buffer."

with eval_mode():
for _ in range(5):
res = await forward()
if "I'm" in res.value.content or "I am" in res.value.content:
return
raise AssertionError("Failed to perform expert iteration training")


@pytest.mark.asyncio
@pytest.mark.skipif(
IN_GITHUB_ACTIONS,
reason="Flaky test because of the stochasticity of LLM completion",
)
@pytest.mark.usefixtures("seed_zero")
async def test_openai_sft_optimizer_return_threshold() -> None:
prompt_op = PromptOp("Who Are you?")
package_msg_op = FxnOp(append_to_sys)
llm_config = {"model": "gpt-4o-mini"} # Check gpt-4o finetuning.
llm_call_op = LLMCallOp()

@compute_graph()
async def forward():
"""Perform a forward pass through the model and calculate the loss."""
s = await prompt_op()
msg = await package_msg_op(s)
return await llm_call_op(llm_config, msg)

# Set up the optimizer with a reward threshold higher than the test rewards
opt_config = {"buffer_size": 10, "return_threshold": 5.0}
opt = OpenAISFTOpt(llm_call_op=llm_call_op, config=OpenAISFTOptConfig(**opt_config))

# Test with rewards lower than the threshold
res_list = [await forward()]
rewards = [3] # Lower than the threshold
for res, _ in zip(res_list, rewards, strict=False):
res.compute_grads(backward_fns={FxnOp: ste})

trajectory = Trajectory(
steps=[
Transition(
timestep=0,
agent_state=None,
next_agent_state=None,
observation=Transition.NO_OBSERVATION,
next_observation=Transition.NO_OBSERVATION,
action=res,
reward=reward,
done=False,
)
for res, reward in zip(res_list, rewards, strict=False)
]
)

opt.aggregate_trajectory(trajectory)

# Assert that the train buffer remains empty
assert not opt.train_buffer, "Expected train buffer to be empty."


@pytest.mark.asyncio
@pytest.mark.skipif(
IN_GITHUB_ACTIONS,
reason="Flaky test because of the stochasticity of LLM completion",
)
async def test_openai_sft_optimizer_with_tool_calls() -> None:
agent = ReActAgent(
llm_model={"model": CILLMModelNames.OPENAI.value, "temperature": 1.0}
)
opt = OpenAISFTOpt.from_agent(agent)
rollout = RolloutManager(agent)

results: list[tuple[Trajectory, Any]] = await rollout.sample_trajectories(
environment_factory=lambda: DummyEnv(end_immediately=False),
max_steps=2,
batch_size=12,
)

for traj, _ in results:
if traj.failed:
continue

assert len(traj.steps) == 2
traj.steps[0].reward = 0.0
traj.steps[1].reward = 1.0
traj.steps[1].truncated = False
traj.steps[1].done = True

for step in traj.steps:
assert step.action is not None, "Expected step.action to be non-None"
step.action.compute_grads()

opt.aggregate_trajectory(traj)

await opt.update()


@pytest.mark.asyncio
@pytest.mark.skipif(
IN_GITHUB_ACTIONS,
reason=(
"Flaky test because of the stochasticity of LLM completion; small rate limits"
),
)
async def test_openai_sft_optimizer_with_length_penalty() -> None:
agent = ReActAgent(
llm_model={"model": CILLMModelNames.OPENAI.value, "temperature": 1.0}
)
opt_config = {
"buffer_size": 10,
"return_threshold": 5.0, # Set return threshold to 5.0
}
opt = OpenAISFTOpt.from_agent(agent, config=OpenAISFTOptConfig(**opt_config))
rollout = RolloutManager(agent)

# Define a penalty function that penalizes the length of the return list
def length_penalty(length: int) -> float:
return 1 / (1 + length) # Simple penalty based on list length

# Sample trajectories from the environment
results: list[tuple[Trajectory, Any]] = await rollout.sample_trajectories(
environment_factory=lambda: DummyEnv(end_immediately=False),
max_steps=2,
batch_size=12,
)

# Modify the first trajectory to create a short trajectory with a length of 1
short_trajectory = results[0][0]
short_trajectory.steps = short_trajectory.steps[:1] # Keep only the first step
short_trajectory.steps[0].reward = 12.0 # High reward
short_trajectory.steps[0].done = True
assert (
short_trajectory.steps[0].action is not None
), "Expected step.action to be non-None"
short_trajectory.steps[0].action.compute_grads()

# Apply the penalty function when aggregating the short trajectory
opt.aggregate_trajectory(short_trajectory, len_penalty_fn=length_penalty)

# Modify the second trajectory to create a long trajectory with a length of 10
long_trajectory = results[1][0]
long_trajectory.steps *= 5 # Repeat steps to make 10
for step in long_trajectory.steps:
step.reward = 0.5 # Low reward for each step
step.truncated = False
step.done = False
assert step.action is not None, "Expected step.action to be non-None"
step.action.compute_grads()
long_trajectory.steps[-1].done = True # Mark the last step as done

# Apply the penalty function when aggregating the long trajectory
opt.aggregate_trajectory(long_trajectory, len_penalty_fn=length_penalty)

# Verify that the short trajectory is in the buffer and the long one is not
assert len(opt.train_buffer) == 1, "Expected only one trajectory in the buffer."


def mem_opt_failed(exc: BaseException) -> bool:
# Sometimes the memory opt fails to converge because the training examples
# are not informative. Try again
Expand Down
Loading