Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some utils for getting traces from the langfuse client #414

Merged
merged 4 commits into from
Sep 18, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions prediction_market_agent_tooling/tools/langfuse_client_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from datetime import datetime

from langfuse import Langfuse
from langfuse.client import TraceWithDetails

from prediction_market_agent_tooling.markets.data_models import (
ProbabilisticAnswer,
ResolvedBet,
Trade,
)
from prediction_market_agent_tooling.markets.omen.omen import OmenAgentMarket
from prediction_market_agent_tooling.tools.utils import add_utc_timezone_validator


def get_traces_for_agent(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of getting traces, it might be more future-proof to get observations with name process_market, right now, it's the same thing (trace of trader agent === observation of process_market). But it could be changed in the future and one trace could contain many process_market observations: gnosis/prediction-market-agent#444 (comment).

However it works now, it's already implement and it's just "maybe" so good by me, just FYI.

Copy link
Contributor Author

@evangriffiths evangriffiths Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried this (client.get_observations(name="process_market", ...)) but it returns an empty list - observations have the names of spans (in red), whereas process_market is the name of the trace.

So I've added the trace_name="process_market" arg. But atm the only traces containing agent names (e.g. DeployablePredictionProphetGPT4TurboFinalAgent) are already the "process_market" traces. But I guess as you say it's more future proof.

Screenshot 2024-09-18 at 15 26 13

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But both trace and span are created using @observe()! That behaviour doesn't make sense! 😄

Thanks for trying and sorry for killing your time then 😢

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries - it might be something I'm doing wrong. But I'll leave it for now if that's okay :)

agent_name: str,
from_timestamp: datetime,
has_output: bool,
client: Langfuse,
) -> list[TraceWithDetails]:
"""
Fetch agent traces using pagination
"""
page = 1 # index starts from 1
all_agent_traces = []
while True:
traces = client.fetch_traces(
limit=100,
page=page,
from_timestamp=from_timestamp,
)
if not traces.data:
break
page += 1

agent_traces = [
t
for t in traces.data
if t.session_id is not None and agent_name in t.session_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, shall we change session_id to just the agent's name? Or we could add agent's name as a tag to filter it by that.

Screenshot by Dropbox Capture

(not as part of this PR, just thinking that fetching all the traces won't be very efficient in future)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In UI, they allow to filter by session id startswith and similar fancy stuff, it didn't cross my mind to check if they allow that in Python SDK as well 😮‍💨

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was surprised by that too. I would have guessed that the sdk they expose would be the same as what they're using for the web backend, but I guess not!

Or we could add agent's name as a tag to filter it by that.

I was going to suggest this! Will make a ticket

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ticket for us here: #417

Also made an issue on the langfuse github https://github.com/orgs/langfuse/discussions/3395

]
if has_output:
agent_traces = [t for t in agent_traces if t.output is not None]
all_agent_traces.extend(agent_traces)
return all_agent_traces


def trace_to_omen_agent_market(trace: TraceWithDetails) -> OmenAgentMarket:
assert trace.input is not None, "Trace input is None"
assert trace.input["args"] is not None, "Trace input args is None"
assert len(trace.input["args"]) == 2 and trace.input["args"][0] == "omen"
return OmenAgentMarket.model_validate(trace.input["args"][1])


def trace_to_answer(trace: TraceWithDetails) -> ProbabilisticAnswer:
assert trace.output is not None, "Trace output is None"
assert trace.output["answer"] is not None, "Trace output result is None"
return ProbabilisticAnswer.model_validate(trace.output["answer"])


def trace_to_trades(trace: TraceWithDetails) -> list[Trade]:
assert trace.output is not None, "Trace output is None"
assert trace.output["trades"] is not None, "Trace output trades is None"
return [Trade.model_validate(t) for t in trace.output["trades"]]


def get_closest_datetime_from_list(
ref_datetime: datetime, datetimes: list[datetime]
) -> int:
"""Get the index of the closest datetime to the reference datetime"""
if len(datetimes) == 1:
return 0

closest_datetime = min(datetimes, key=lambda dt: abs(dt - ref_datetime))
return datetimes.index(closest_datetime)


def get_trace_for_bet(
bet: ResolvedBet, traces: list[TraceWithDetails]
) -> TraceWithDetails:
# Get traces with the same market id
traces_for_bet = [
t for t in traces if trace_to_omen_agent_market(t).id == bet.market_id
]

# In-case there are multiple traces for the same market, get the closest trace to the bet
closest_trace_index = get_closest_datetime_from_list(
add_utc_timezone_validator(bet.created_time),
[t.timestamp for t in traces_for_bet],
)
# Sanity check - the trace should be after the bet
assert traces_for_bet[closest_trace_index].timestamp > bet.created_time

return traces_for_bet[closest_trace_index]
Loading