Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix betting benchmark #526

Merged
merged 11 commits into from
Oct 24, 2024
20 changes: 12 additions & 8 deletions examples/monitor/match_bets_with_langfuse_traces.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from prediction_market_agent_tooling.config import APIKeys
from prediction_market_agent_tooling.deploy.betting_strategy import (
BettingStrategy,
GuaranteedLossError,
KellyBettingStrategy,
MaxAccuracyBettingStrategy,
MaxAccuracyWithKellyScaledBetsStrategy,
Expand Down Expand Up @@ -50,14 +51,17 @@ def get_outcome_for_trace(
market = trace.market
answer = trace.answer

trades = strategy.calculate_trades(
existing_position=None,
answer=ProbabilisticAnswer(
p_yes=answer.p_yes,
confidence=answer.confidence,
),
market=market,
)
try:
trades = strategy.calculate_trades(
existing_position=None,
answer=ProbabilisticAnswer(
p_yes=answer.p_yes,
confidence=answer.confidence,
),
market=market,
)
except GuaranteedLossError:
return None
# For example, when our predicted p_yes is 95%, but market is already trading at 99%, and we don't have anything to sell, Kelly will yield no trades.
if not trades:
return None
Expand Down
245 changes: 159 additions & 86 deletions prediction_market_agent_tooling/deploy/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
AgentMarket,
FilterBy,
ProcessedMarket,
ProcessedTradedMarket,
SortBy,
)
from prediction_market_agent_tooling.markets.data_models import (
Expand Down Expand Up @@ -165,9 +166,11 @@ def session_id(self) -> str:
return f"{self.__class__.__name__} - {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}"

def __init_subclass__(cls, **kwargs: t.Any) -> None:
if "DeployableAgent" not in str(
cls.__init__
) and "DeployableTraderAgent" not in str(cls.__init__):
if (
"DeployableAgent" not in str(cls.__init__)
and "DeployableTraderAgent" not in str(cls.__init__)
and "DeployablePredictionAgent" not in str(cls.__init__)
):
raise TypeError(
"Cannot override __init__ method of deployable agent class, please override the `load` method to set up the agent."
)
Expand Down Expand Up @@ -274,28 +277,21 @@ def get_gcloud_fname(self, market_type: MarketType) -> str:
return f"{self.__class__.__name__.lower()}-{market_type}-{utcnow().strftime('%Y-%m-%d--%H-%M-%S')}"


class DeployableTraderAgent(DeployableAgent):
class DeployablePredictionAgent(DeployableAgent):
bet_on_n_markets_per_run: int = 1
min_balance_to_keep_in_native_currency: xDai | None = xdai_type(0.1)
allow_invalid_questions: bool = False
same_market_bet_interval: timedelta = timedelta(hours=24)
# Only Metaculus allows to post predictions without trading (buying/selling of outcome tokens).
supported_markets: t.Sequence[MarketType] = [MarketType.METACULUS]

def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
place_bet: bool = True,
store_prediction: bool = True,
) -> None:
super().__init__(enable_langfuse=enable_langfuse)
self.place_bet = place_bet

def get_betting_strategy(self, market: AgentMarket) -> BettingStrategy:
user_id = market.get_user_id(api_keys=APIKeys())

total_amount = market.get_tiny_bet_amount().amount
if existing_position := market.get_position(user_id=user_id):
total_amount += existing_position.total_amount.amount

return MaxAccuracyBettingStrategy(bet_amount=total_amount)
self.store_prediction = store_prediction

def initialize_langfuse(self) -> None:
super().initialize_langfuse()
Expand All @@ -304,7 +300,6 @@ def initialize_langfuse(self) -> None:
self.verify_market = observe()(self.verify_market) # type: ignore[method-assign]
self.answer_binary_market = observe()(self.answer_binary_market) # type: ignore[method-assign]
self.process_market = observe()(self.process_market) # type: ignore[method-assign]
self.build_trades = observe()(self.build_trades) # type: ignore[method-assign]

def update_langfuse_trace_by_market(
self, market_type: MarketType, market: AgentMarket
Expand Down Expand Up @@ -342,19 +337,6 @@ def check_min_required_balance_to_operate(self, market_type: MarketType) -> None
f"{api_keys=} doesn't have enough operational balance."
)

def check_min_required_balance_to_trade(self, market: AgentMarket) -> None:
api_keys = APIKeys()

# Get the strategy to know how much it will bet.
strategy = self.get_betting_strategy(market)
# Have a little bandwidth after the bet.
min_required_balance_to_trade = strategy.maximum_possible_bet_amount * 1.01

if market.get_trade_balance(api_keys) < min_required_balance_to_trade:
raise OutOfFundsError(
f"Minimum required balance {min_required_balance_to_trade} for agent is not met."
)

def have_bet_on_market_since(self, market: AgentMarket, since: timedelta) -> bool:
return have_bet_on_market_since(keys=APIKeys(), market=market, since=since)

Expand Down Expand Up @@ -399,26 +381,11 @@ def get_markets(
)
return available_markets

def build_trades(
self,
market: AgentMarket,
answer: ProbabilisticAnswer,
existing_position: Position | None,
) -> list[Trade]:
strategy = self.get_betting_strategy(market=market)
trades = strategy.calculate_trades(existing_position, answer, market)
BettingStrategy.assert_trades_currency_match_markets(market, trades)
return trades

def before_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
self.update_langfuse_trace_by_market(market_type, market)

api_keys = APIKeys()

self.check_min_required_balance_to_trade(market)

if market_type.is_blockchain_market:
# Exchange wxdai back to xdai if the balance is getting low, so we can keep paying for fees.
if self.min_balance_to_keep_in_native_currency is not None:
Expand All @@ -434,67 +401,37 @@ def process_market(
market: AgentMarket,
verify_market: bool = True,
) -> ProcessedMarket | None:
self.update_langfuse_trace_by_market(market_type, market)
logger.info(f"Processing market {market.question=} from {market.url=}.")

self.before_process_market(market_type, market)

if verify_market and not self.verify_market(market_type, market):
logger.info(f"Market '{market.question}' doesn't meet the criteria.")
self.update_langfuse_trace_by_processed_market(market_type, None)
return None

answer = self.answer_binary_market(market)

if answer is None:
logger.info(f"No answer for market '{market.question}'.")
self.update_langfuse_trace_by_processed_market(market_type, None)
return None

existing_position = market.get_position(user_id=APIKeys().bet_from_address)
trades = self.build_trades(
market=market,
answer=answer,
existing_position=existing_position,
processed_market = (
ProcessedMarket(answer=answer) if answer is not None else None
)

placed_trades = []
for trade in trades:
logger.info(f"Executing trade {trade} on market {market.id} ({market.url})")

if self.place_bet:
match trade.trade_type:
case TradeType.BUY:
id = market.buy_tokens(
outcome=trade.outcome, amount=trade.amount
)
case TradeType.SELL:
id = market.sell_tokens(
outcome=trade.outcome, amount=trade.amount
)
case _:
raise ValueError(f"Unexpected trade type {trade.trade_type}.")
placed_trades.append(PlacedTrade.from_trade(trade, id))
else:
logger.info(f"Trade execution skipped because {self.place_bet=}.")

processed_market = ProcessedMarket(answer=answer, trades=placed_trades)
self.update_langfuse_trace_by_processed_market(market_type, processed_market)

self.after_process_market(
market_type, market, processed_market=processed_market
logger.info(
f"Processed market {market.question=} from {market.url=} with {answer=}."
)

logger.info(f"Processed market {market.question=} from {market.url=}.")
return processed_market

def after_process_market(
self,
market_type: MarketType,
market: AgentMarket,
processed_market: ProcessedMarket,
processed_market: ProcessedMarket | None,
) -> None:
keys = APIKeys()
market.store_prediction(processed_market=processed_market, keys=keys)
if self.store_prediction:
market.store_prediction(processed_market=processed_market, keys=keys)
else:
logger.info(
f"Prediction {processed_market} not stored because {self.store_prediction=}."
)

def before_process_markets(self, market_type: MarketType) -> None:
"""
Expand All @@ -516,7 +453,9 @@ def process_markets(self, market_type: MarketType) -> None:
processed = 0

for market in available_markets:
self.before_process_market(market_type, market)
processed_market = self.process_market(market_type, market)
self.after_process_market(market_type, market, processed_market)

if processed_market is not None:
processed += 1
Expand All @@ -530,6 +469,140 @@ def after_process_markets(self, market_type: MarketType) -> None:
"Executes actions that occur after bets are placed."

def run(self, market_type: MarketType) -> None:
if market_type not in self.supported_markets:
raise ValueError(
f"Only {self.supported_markets} are supported by this agent."
)
self.before_process_markets(market_type)
self.process_markets(market_type)
self.after_process_markets(market_type)


class DeployableTraderAgent(DeployablePredictionAgent):
# These markets require place of bet, not just predictions.
supported_markets: t.Sequence[MarketType] = [
MarketType.OMEN,
MarketType.MANIFOLD,
MarketType.POLYMARKET,
]

def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
store_prediction: bool = True,
store_trades: bool = True,
place_trades: bool = True,
) -> None:
super().__init__(
enable_langfuse=enable_langfuse, store_prediction=store_prediction
)
self.store_prediction = store_prediction
self.store_trades = store_trades
self.place_trades = place_trades

def initialize_langfuse(self) -> None:
super().initialize_langfuse()
# Auto-observe all the methods where it makes sense, so that subclassses don't need to do it manually.
self.get_betting_strategy = observe()(self.get_betting_strategy) # type: ignore[method-assign]
self.build_trades = observe()(self.build_trades) # type: ignore[method-assign]

def check_min_required_balance_to_trade(self, market: AgentMarket) -> None:
api_keys = APIKeys()

# Get the strategy to know how much it will bet.
strategy = self.get_betting_strategy(market)
# Have a little bandwidth after the bet.
min_required_balance_to_trade = strategy.maximum_possible_bet_amount * 1.01

if market.get_trade_balance(api_keys) < min_required_balance_to_trade:
raise OutOfFundsError(
f"Minimum required balance {min_required_balance_to_trade} for agent is not met."
)

def get_betting_strategy(self, market: AgentMarket) -> BettingStrategy:
user_id = market.get_user_id(api_keys=APIKeys())

total_amount = market.get_tiny_bet_amount().amount
if existing_position := market.get_position(user_id=user_id):
total_amount += existing_position.total_amount.amount

return MaxAccuracyBettingStrategy(bet_amount=total_amount)

def build_trades(
self,
market: AgentMarket,
answer: ProbabilisticAnswer,
existing_position: Position | None,
) -> list[Trade]:
strategy = self.get_betting_strategy(market=market)
trades = strategy.calculate_trades(existing_position, answer, market)
BettingStrategy.assert_trades_currency_match_markets(market, trades)
return trades

def before_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
super().before_process_market(market_type, market)
self.check_min_required_balance_to_trade(market)

def process_market(
self,
market_type: MarketType,
market: AgentMarket,
verify_market: bool = True,
) -> ProcessedTradedMarket | None:
processed_market = super().process_market(market_type, market, verify_market)
if processed_market is None:
return None

api_keys = APIKeys()
existing_position = market.get_position(
user_id=market.get_user_id(api_keys=api_keys)
)
trades = self.build_trades(
market=market,
answer=processed_market.answer,
existing_position=existing_position,
)

placed_trades = []
for trade in trades:
logger.info(f"Executing trade {trade} on market {market.id} ({market.url})")

if self.place_trades:
match trade.trade_type:
case TradeType.BUY:
id = market.buy_tokens(
outcome=trade.outcome, amount=trade.amount
)
case TradeType.SELL:
id = market.sell_tokens(
outcome=trade.outcome, amount=trade.amount
)
case _:
raise ValueError(f"Unexpected trade type {trade.trade_type}.")
placed_trades.append(PlacedTrade.from_trade(trade, id))
else:
logger.info(f"Trade execution skipped because {self.place_trades=}.")

traded_market = ProcessedTradedMarket(
answer=processed_market.answer, trades=placed_trades
)
logger.info(f"Traded market {market.question=} from {market.url=}.")
return traded_market

def after_process_market(
self,
market_type: MarketType,
market: AgentMarket,
processed_market: ProcessedMarket | None,
) -> None:
api_keys = APIKeys()
super().after_process_market(market_type, market, processed_market)
if isinstance(processed_market, ProcessedTradedMarket):
if self.store_trades:
market.store_trades(processed_market, api_keys)
else:
logger.info(
f"Trades {processed_market.trades} not stored because {self.store_trades=}."
)
6 changes: 5 additions & 1 deletion prediction_market_agent_tooling/deploy/betting_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
from prediction_market_agent_tooling.tools.utils import check_not_none


class GuaranteedLossError(RuntimeError):
pass


class BettingStrategy(ABC):
@abstractmethod
def calculate_trades(
Expand Down Expand Up @@ -63,7 +67,7 @@ def assert_buy_trade_wont_be_guaranteed_loss(
)

if outcome_tokens_to_get.amount < trade.amount.amount:
raise RuntimeError(
raise GuaranteedLossError(
f"Trade {trade=} would result in guaranteed loss by getting only {outcome_tokens_to_get=}."
)

Expand Down
Loading
Loading