Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 38 additions & 128 deletions litellm/proxy/db/db_spend_update_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import os
import time
import traceback
import random
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Dict, Literal, Optional, Union, cast, overload

Expand All @@ -20,7 +19,6 @@
from litellm.constants import DB_SPEND_UPDATE_JOB_NAME
from litellm.litellm_core_utils.safe_json_loads import safe_json_loads
from litellm.proxy._types import (
DB_CONNECTION_ERROR_TYPES,
BaseDailySpendTransaction,
DailyTagSpendTransaction,
DailyTeamSpendTransaction,
Expand Down Expand Up @@ -530,7 +528,14 @@ async def _commit_spend_updates_to_db_without_redis_buffer(

This is the regular flow of committing to db without using a redis buffer

Note: This flow causes Deadlocks in production (1K RPS+). Use self._commit_spend_updates_to_db_with_redis() instead if you expect 1K+ RPS.
Multi-rows writes to the database should ideally always be consistently sorted to minimize the likelihood of deadlocks:
ideally the sorting order should be chosen so that writes to ALL indexes on a table happen in the same order across all
concurrent transactions, as any out-of-order concurrent write to the table or ANY index increases the chances of deadlocks.
Finding a single consistent order across multiple indexes is generally impossible, so we pick one to minimize the chance
of transient deadlocks, and retry later if we are unlucky.

Note: This flow can cause deadlocks under high load. Use self._commit_spend_updates_to_db_with_redis() instead
if you experience a high rate of deadlocks that the retry logic fails to handle.
"""

# Aggregate all in memory spend updates (key, user, end_user, team, team_member, org) and commit to db
Expand Down Expand Up @@ -600,7 +605,7 @@ async def _commit_spend_updates_to_db( # noqa: PLR0915
"""
from litellm.proxy.utils import (
ProxyUpdateSpend,
_raise_failed_update_spend_exception,
_handle_db_exception_retriable,
)

### UPDATE USER TABLE ###
Expand All @@ -622,26 +627,15 @@ async def _commit_spend_updates_to_db( # noqa: PLR0915
for (
user_id,
response_cost,
) in user_list_transactions.items():
) in sorted(user_list_transactions.items()):
batcher.litellm_usertable.update_many(
where={"user_id": user_id},
data={"spend": {"increment": response_cost}},
)
break
except DB_CONNECTION_ERROR_TYPES as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove the retry logic? @CAFxX

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the request does fail, what would happen ?

Copy link
Contributor Author

@CAFxX CAFxX Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@krrishdholakia the current retry logic seems to be broken... at least this is what our monitoring tells us (one single deadlock in the database immediately causes a spend exception, without any retry -- if the logic was working correctly there should be multiple deadlocks in a row before a single spend exception), and this seems to be confirmed by the definition of DB_CONNECTION_ERROR_TYPES (I actually do not understand at all what those httpx exceptions are, or why they are considered relevant here; I just know that the logic seems to not be working)

FWIW, I am not removing the retry logic; I just moved it to a single place (_handle_db_exception_retriable) - and hopefully fixed it as well to actually handle postgres deadlocks - since it's duplicated with small variations in so many different places

if (
i >= n_retry_times
): # If we've reached the maximum number of retries
_raise_failed_update_spend_exception(
e=e,
start_time=start_time,
proxy_logging_obj=proxy_logging_obj,
)
# Optionally, sleep for a bit before retrying
await asyncio.sleep(2**i) # Exponential backoff
except Exception as e:
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
await _handle_db_exception_retriable(
e=e, i=i, n_retry_times=n_retry_times, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)

### UPDATE END-USER TABLE ###
Expand Down Expand Up @@ -677,26 +671,15 @@ async def _commit_spend_updates_to_db( # noqa: PLR0915
for (
token,
response_cost,
) in key_list_transactions.items():
) in sorted(key_list_transactions.items()):
batcher.litellm_verificationtoken.update_many( # 'update_many' prevents error from being raised if no row exists
where={"token": token},
data={"spend": {"increment": response_cost}},
)
break
except DB_CONNECTION_ERROR_TYPES as e:
if (
i >= n_retry_times
): # If we've reached the maximum number of retries
_raise_failed_update_spend_exception(
e=e,
start_time=start_time,
proxy_logging_obj=proxy_logging_obj,
)
# Optionally, sleep for a bit before retrying
await asyncio.sleep(2**i) # Exponential backoff
except Exception as e:
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
await _handle_db_exception_retriable(
e=e, i=i, n_retry_times=n_retry_times, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)

### UPDATE TEAM TABLE ###
Expand All @@ -718,7 +701,7 @@ async def _commit_spend_updates_to_db( # noqa: PLR0915
for (
team_id,
response_cost,
) in team_list_transactions.items():
) in sorted(team_list_transactions.items()):
verbose_proxy_logger.debug(
"Updating spend for team id={} by {}".format(
team_id, response_cost
Expand All @@ -729,20 +712,9 @@ async def _commit_spend_updates_to_db( # noqa: PLR0915
data={"spend": {"increment": response_cost}},
)
break
except DB_CONNECTION_ERROR_TYPES as e:
if (
i >= n_retry_times
): # If we've reached the maximum number of retries
_raise_failed_update_spend_exception(
e=e,
start_time=start_time,
proxy_logging_obj=proxy_logging_obj,
)
# Optionally, sleep for a bit before retrying
await asyncio.sleep(2**i) # Exponential backoff
except Exception as e:
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
await _handle_db_exception_retriable(
e=e, i=i, n_retry_times=n_retry_times, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)

### UPDATE TEAM Membership TABLE with spend ###
Expand All @@ -768,7 +740,7 @@ async def _commit_spend_updates_to_db( # noqa: PLR0915
for (
key,
response_cost,
) in team_member_list_transactions.items():
) in sorted(team_member_list_transactions.items()):
# key is "team_id::<value>::user_id::<value>"
team_id = key.split("::")[1]
user_id = key.split("::")[3]
Expand All @@ -778,20 +750,9 @@ async def _commit_spend_updates_to_db( # noqa: PLR0915
data={"spend": {"increment": response_cost}},
)
break
except DB_CONNECTION_ERROR_TYPES as e:
if (
i >= n_retry_times
): # If we've reached the maximum number of retries
_raise_failed_update_spend_exception(
e=e,
start_time=start_time,
proxy_logging_obj=proxy_logging_obj,
)
# Optionally, sleep for a bit before retrying
await asyncio.sleep(2**i) # Exponential backoff
except Exception as e:
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
await _handle_db_exception_retriable(
e=e, i=i, n_retry_times=n_retry_times, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)

### UPDATE ORG TABLE ###
Expand All @@ -810,33 +771,15 @@ async def _commit_spend_updates_to_db( # noqa: PLR0915
for (
org_id,
response_cost,
) in org_list_transactions.items():
) in sorted(org_list_transactions.items()):
batcher.litellm_organizationtable.update_many( # 'update_many' prevents error from being raised if no row exists
where={"organization_id": org_id},
data={"spend": {"increment": response_cost}},
)
break
except DB_CONNECTION_ERROR_TYPES as e:
if (
i >= n_retry_times
): # If we've reached the maximum number of retries
_raise_failed_update_spend_exception(
e=e,
start_time=start_time,
proxy_logging_obj=proxy_logging_obj,
)
# Optionally, sleep for a bit before retrying
await asyncio.sleep(
# Sleep a random amount to avoid retrying and deadlocking again: when two transactions deadlock they are
# cancelled basically at the same time, so if they wait the same time they will also retry at the same time
# and thus they are more likely to deadlock again.
# Instead, we sleep a random amount so that they retry at slightly different times, lowering the chance of
# repeated deadlocks, and therefore of exceeding the retry limit.
random.uniform(2**i, 2 ** (i + 1))
)
except Exception as e:
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
await _handle_db_exception_retriable(
e=e, i=i, n_retry_times=n_retry_times, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)

### UPDATE TAG TABLE ###
Expand Down Expand Up @@ -873,7 +816,7 @@ async def _update_entity_spend_in_db(
prisma_client: Prisma client instance
proxy_logging_obj: Proxy logging object
"""
from litellm.proxy.utils import _raise_failed_update_spend_exception
from litellm.proxy.utils import _handle_db_exception_retriable

verbose_proxy_logger.debug(
f"{entity_name} Spend transactions: {transactions}"
Expand All @@ -886,7 +829,7 @@ async def _update_entity_spend_in_db(
timeout=timedelta(seconds=60)
) as transaction:
async with transaction.batch_() as batcher:
for entity_id, response_cost in transactions.items():
for entity_id, response_cost in sorted(transactions.items()):
verbose_proxy_logger.debug(
f"Updating spend for {entity_name} {where_field}={entity_id} by {response_cost}"
)
Expand All @@ -895,17 +838,9 @@ async def _update_entity_spend_in_db(
data={"spend": {"increment": response_cost}},
)
break
except DB_CONNECTION_ERROR_TYPES as e:
if i >= n_retry_times:
_raise_failed_update_spend_exception(
e=e,
start_time=start_time,
proxy_logging_obj=proxy_logging_obj,
)
await asyncio.sleep(2**i) # Exponential backoff
except Exception as e:
_raise_failed_update_spend_exception(
e=e, start_time=start_time, proxy_logging_obj=proxy_logging_obj
await _handle_db_exception_retriable(
e=e, i=i, n_retry_times=n_retry_times, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)

# fmt: off
Expand Down Expand Up @@ -971,7 +906,7 @@ async def _update_daily_spend(
"""
Generic function to update daily spend for any entity type (user, team, tag)
"""
from litellm.proxy.utils import _raise_failed_update_spend_exception
from litellm.proxy.utils import _raise_failed_update_spend_exception, _handle_db_exception_retriable

verbose_proxy_logger.debug(
f"Daily {entity_type.capitalize()} Spend transactions: {len(daily_spend_transactions)}"
Expand All @@ -984,26 +919,12 @@ async def _update_daily_spend(
try:
# Sort the transactions to minimize the probability of deadlocks by reducing the chance of concurrent
# trasactions locking the same rows/ranges in different orders.
transactions_to_process = dict(
sorted(
daily_spend_transactions.items(),
# Normally to avoid deadlocks we would sort by the index, but since we have sprinkled indexes
# on our schema like we're discount Salt Bae, we just sort by all fields that have an index,
# in an ad-hoc (but hopefully sensible) order of indexes. The actual ordering matters less than
# ensuring that all concurrent transactions sort in the same order.
# We could in theory use the dict key, as it contains basically the same fields, but this is more
# robust to future changes in the key format.
# If _update_daily_spend ever gets the ability to write to multiple tables at once, the sorting
# should sort by the table first.
key=lambda x: (
x[1]["date"],
x[1].get(entity_id_field) or "",
x[1]["api_key"],
x[1]["model"],
x[1]["custom_llm_provider"],
),
)[:BATCH_SIZE]
)
# Normally to avoid deadlocks we would sort by the index, but since we have sprinkled indexes
# on our schema like we're discount Salt Bae, we just sort by the dict key. The actual ordering
# matters less than ensuring that all concurrent transactions sort in the same order.
# If _update_daily_spend ever gets the ability to write to multiple tables at once, the sorting
# should sort by the table first.
transactions_to_process = dict(sorted(daily_spend_transactions.items())[:BATCH_SIZE])

if len(transactions_to_process) == 0:
verbose_proxy_logger.debug(
Expand Down Expand Up @@ -1122,20 +1043,9 @@ async def _update_daily_spend(

break

except DB_CONNECTION_ERROR_TYPES as e:
if i >= n_retry_times:
_raise_failed_update_spend_exception(
e=e,
start_time=start_time,
proxy_logging_obj=proxy_logging_obj,
)
await asyncio.sleep(
# Sleep a random amount to avoid retrying and deadlocking again: when two transactions deadlock they are
# cancelled basically at the same time, so if they wait the same time they will also retry at the same time
# and thus they are more likely to deadlock again.
# Instead, we sleep a random amount so that they retry at slightly different times, lowering the chance of
# repeated deadlocks, and therefore of exceeding the retry limit.
random.uniform(2**i, 2 ** (i + 1))
except Exception as e:
await _handle_db_exception_retriable(
e=e, i=i, n_retry_times=n_retry_times, start_time=start_time, proxy_logging_obj=proxy_logging_obj
)

except Exception as e:
Expand Down
29 changes: 29 additions & 0 deletions litellm/proxy/db/exception_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,35 @@ def is_database_connection_error(e: Exception) -> bool:
if isinstance(e, ProxyException) and e.type == ProxyErrorTypes.no_db_connection:
return True
return False

@staticmethod
def is_database_retriable_exception(e: Exception) -> bool:
"""
Returns True if the execption is from a condition (e.g. deadlock, broken connection, etc.) that should be retried.
"""
import re

if isinstance(e, DB_CONNECTION_ERROR_TYPES): # TODO: is this actually needed?
return True
Comment on lines +54 to +55
Copy link
Contributor Author

@CAFxX CAFxX Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am keeping this just because it was the old logic. I have no idea, nor a way to practically test, if this is appropriate or not.


# Deadlocks should normally be retried.
# Postgres right now, on deadlock, triggers an exception similar to:
# Error occurred during query execution: ConnectorError(ConnectorError { user_facing_error: None,
# kind: QueryError(PostgresError { code: "40P01", message: "deadlock detected", severity: "ERROR",
# detail: Some("Process 3753505 waits for ShareLock on transaction 5729447; blocked by process 3755128.\n
# Process 3755128 waits for ShareLock on transaction 5729448; blocked by process 3753505."), column: None,
# hint: Some("See server log for query details.") }), transient: false })
# Unfortunately there does not seem to be a easy way to properly parse that or otherwise detect the specific
# issue, so just match using a regular expression. This is definitely not ideal, but not much we can do about
# it.
Comment on lines +64 to +66
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least, I could not find a better way. Suggestions are most welcome.

if re.search(r'\bConnectorError\b.*?\bQueryError\b.*?\bPostgresError\b.*?"40P01"', str(e), re.DOTALL):
return True

# TODO: add additional specific cases (be careful to not add exceptions that should not be retried!)
# If many more additional regular expressions are added, it may make sense to combine them into a single one,
# or use something like hyperscan.

return False

@staticmethod
def handle_db_exception(e: Exception):
Expand Down
Loading
Loading