Skip to content

Commit

Permalink
Warn and merge if solvers appear in prod and barn (#411)
Browse files Browse the repository at this point in the history
This PR adds default handling in case solvers appear in prod and barn.
In such cases, a warning is emitted and data is merged.

At the moment, if settlement in prod and barn are settled by the same
address (or rather if the same address is reported during bidding),
solvers can appear in multiple rows in `batch_rewards_df` here. Since we
later merge on solvers, we might assign slippage or quote rewards to a
solver twice. A similar thing can happen with `quote_rewards_df`.

With this PR, duplicate entries are detected after fetching data and
suitably merged. All numerical columns are summed in merging. The
columns containing list entries on partner fees are concatenated.

The assumption on unique solvers is later checked at the point where it
is required.

This is more of a hot fix and refactoring is required to make the code
understandable.

One alternative would be to implement all processing of the fetched data
in `payouts.py` where the rest of the processing happens. With this PR
there is a mix of fetching, checking, merging of rewards, followed by
checking and merging of rewards, followed by checking them again in
`construct_payout_dataframe`. Before this PR, there was fetching,
merging of rewards, then merging of rewards, then checking and merging.
So generally quote confusing.

# Testing plan

I tested the code for the accounting week 2024-10-15 -- 2024-10-22 and
it produced the roughly same amounts which were used for payments. There
was a difference of 0.003 ETH and around 100 COW. This difference if
probably because in the actual payment, data was not merged but removed.

I have not added a test yet.
  • Loading branch information
fhenneke authored Oct 28, 2024
1 parent fa1e14a commit b0e7bd5
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 3 deletions.
4 changes: 4 additions & 0 deletions src/fetch/payouts.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,13 @@ def construct_payouts(
batch_rewards_df = batch_rewards_df.drop(
["partner_list", "partner_fee_eth"], axis=1
)

assert batch_rewards_df["solver"].is_unique, "solver not unique in batch rewards"
assert quote_rewards_df["solver"].is_unique, "solver not unique in quote rewards"
merged_df = pandas.merge(
quote_rewards_df, batch_rewards_df, on="solver", how="outer"
).fillna(0)

service_fee_df = pandas.DataFrame(dune.get_service_fee_status())
service_fee_df["service_fee"] = [
datetime.strptime(time_string, "%Y-%m-%d %H:%M:%S.%f %Z") <= dune.period.start
Expand Down
67 changes: 64 additions & 3 deletions src/pg_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@


import pandas as pd
from pandas import DataFrame
from pandas import DataFrame, Series
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine

from src.logger import set_log
from src.utils.query_file import open_query

log = set_log(__name__)


class MultiInstanceDBFetcher:
"""
Expand Down Expand Up @@ -57,7 +60,28 @@ def get_solver_rewards(self, start_block: str, end_block: str) -> DataFrame:
self.exec_query(query=batch_reward_query_barn, engine=engine)
)

return pd.concat(results)
results_df = pd.concat(results)

# warn and merge in case of solvers in both environments
if not results_df["solver"].is_unique:
log_duplicate_rows(results_df)

results_df = (
results_df.groupby("solver")
.agg(
{
"primary_reward_eth": "sum",
"protocol_fee_eth": "sum",
"network_fee_eth": "sum",
# there can be duplicate entries in partner_list now
"partner_list": merge_lists,
"partner_fee_eth": merge_lists,
}
)
.reset_index()
)

return results_df

def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame:
"""Returns aggregated solver quote rewards for block range"""
Expand All @@ -70,8 +94,17 @@ def get_quote_rewards(self, start_block: str, end_block: str) -> DataFrame:
self.exec_query(query=quote_reward_query, engine=engine)
for engine in self.connections
]
results_df = pd.concat(results)

# warn and merge in case of solvers in both environments
if not results_df["solver"].is_unique:
log_duplicate_rows(results_df)

return pd.concat(results)
results_df = (
results_df.groupby("solver").agg({"num_quotes": "sum"}).reset_index()
)

return results_df


def pg_hex2bytea(hex_address: str) -> str:
Expand All @@ -80,3 +113,31 @@ def pg_hex2bytea(hex_address: str) -> str:
compatible bytea by replacing `0x` with `\\x`.
"""
return hex_address.replace("0x", "\\x")


def log_duplicate_rows(df: DataFrame) -> None:
"""Log rows with duplicate solvers entries.
Printing defaults are changed to show all column entries."""
duplicated_entries = df[df["solver"].duplicated(keep=False)]
with pd.option_context(
"display.max_columns",
None,
"display.width",
None,
"display.max_colwidth",
None,
):
log.warning(
f"Solvers found in both environments:\n {duplicated_entries}.\n"
"Merging results."
)


def merge_lists(series: Series) -> list | None:
"""Merges series containing lists into large list.
Returns None if the result would be an empty list."""
merged = []
for lst in series:
if lst is not None:
merged.extend(lst)
return merged if merged else None

0 comments on commit b0e7bd5

Please sign in to comment.