Skip to content

Commit

Permalink
add Diff and Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
NickCrews committed Jan 23, 2025
1 parent 88e04d5 commit 72e4635
Show file tree
Hide file tree
Showing 3 changed files with 313 additions and 0 deletions.
2 changes: 2 additions & 0 deletions mismo/types/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

from mismo.types._diff import Diff as Diff
from mismo.types._linked_table import Linkage as Linkage
from mismo.types._linked_table import LinkCountsTable as LinkCountsTable
from mismo.types._linked_table import LinkedTable as LinkedTable
from mismo.types._updates import Updates as Updates
69 changes: 69 additions & 0 deletions mismo/types/_diff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from __future__ import annotations

import ibis

from mismo.types import Updates


class Diff:
"""A set of insertions, updates, and deletions between two tables."""

def __init__(
self, before: ibis.Table, after: ibis.Table, *, join_on: str | None = None
):
"""Create a set of changes between two tables."""
self._before = before
self._after = after
self._join_on = join_on

@classmethod
def from_deltas(
cls,
*,
before: ibis.Table,
insertions: ibis.Table | None = None,
updates: ibis.Table | None = None,
deletions: ibis.Table | None = None,
join_on: str | None = None,
):
"""Create from a starting point and a set of transformations."""
after = before
if deletions is not None:
after = before.anti_join(deletions, join_on)
if updates is not None:
after = ibis.union(after.anti_join(updates, join_on), updates)
if insertions is not None:
after = ibis.union(after, insertions)
return cls(before, after=after, join_on=join_on)

def cache(self) -> Diff:
"""Cache the tables in the changes."""
return Diff(
before=self._before.cache(),
after=self._after.cache(),
join_on=self.join_on(),
)

def before(self) -> ibis.Table:
"""The table before the changes."""
return self._before

def after(self) -> ibis.Table:
"""The table after the changes."""
return self._after

def join_on(self) -> str:
"""The key to join the before and after tables."""
return self._join_on

def updates(self) -> Updates:
"""Rows that were updated between `before` and `after`."""
return Updates.from_tables(self._before, self._after, join_on=self.join_on())

def insertions(self) -> ibis.Table:
"""Rows that were in `after` but not in `before`."""
return self._after.anti_join(self._before, self.join_on())

def deletions(self) -> ibis.Table:
"""Rows that were in `before` but not in `after`."""
return self._before.anti_join(self._after, self.join_on())
242 changes: 242 additions & 0 deletions mismo/types/_updates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
from __future__ import annotations

from typing import Any, Iterable, Literal, TypedDict

import ibis
from ibis.expr import datatypes as dt
from ibis.expr import types as ir

from mismo import _util


class FieldUpdateDict(TypedDict):
"""A dict representing how a field in a row changed"""

before: Any
after: Any


class filters:
@staticmethod
def all_different(subset: Iterable[str] | None = None):
"""
Make a Updates filter function that gives rows where all columns are different.
Parameters
----------
subset : Iterable[str] | None
The columns to consider.
If None, we consider all columns in both before and after tables.
Examples
--------
>>> u = Updates.from_tables(before, after, join_on="id")
>>> u.filter(u.filters.all_different(["name", "age"]))
"""

def filter_func(table: ir.Table):
nonlocal subset
if subset is None:
subset = _columns_in_both(table)
preds = [table[col].before != table[col].after for col in subset]
return ibis.and_(*preds)

return filter_func

@staticmethod
def any_different(subset: Iterable[str] | None = None):
"""Make a Updates filter function that gives rows where any column is different.
Parameters
----------
subset : Iterable[str] | None
The columns to consider.
If None, we consider all columns in both before and after tables.
Examples
--------
>>> u = Updates.from_tables(before, after, join_on="id")
>>> u.filter(u.filters.any_different(["name", "age"]))
"""

def filter_func(table: ir.Table):
nonlocal subset
if subset is None:
subset = _columns_in_both(table)
preds = [table[col].before != table[col].after for col in subset]
return ibis.or_(*preds)

return filter_func


class Updates(_util.TableWrapper):
"""A Table representing how individual rows were updated.
This is a Table of structs, each struct having 'before' and 'after' fields.
It only represents rows that were present in both tables.
This is because consider these two situations:
1. A row was in the before table, but not in the after table.
2. A row was in both tables, but was all NULL in the after table.
In both cases, a good representation would be to have {before: ..., after: NULL}.
But this makes it impossible to distinguish between the two cases.
So, we only consider rows that were present in both tables.
Insertions and deletions should be handled separately.
This represents how each column has changed between two tables.
If a column only has a 'before' field, this means it was removed.
If a column only has an 'after' field, this means it was added.
If a column has both 'before' and 'after' fields, this means it was present in both tables.
""" # noqa: E501

filters = filters
"""A set of filters for convenience."""

def __init__(
self,
diff_table: ibis.Table,
*,
schema: Literal["exactly", "names", "lax"] = "exactly",
) -> None:
def _check_col(name: str):
col_type: dt.DataType = diff_table[name].type()
if not isinstance(col_type, dt.Struct):
return f"Column {name} is not a struct"
fields = set(col_type.names)
if "before" not in fields and "after" not in fields:
return f"Column {name} needs to have at least one of 'before' and 'after' field" # noqa: E501
if schema == "exactly":
if "before" not in fields or "after" not in fields:
return (
f"Column {name} needs to have both 'before' and 'after' fields"
)
b_type = col_type.fields["before"]
a_type = col_type.fields["after"]
if b_type != a_type:
return f"Column {name} needs to have the same type for 'before' ({b_type}) and 'after' ({a_type})" # noqa: E501
elif schema == "names":
if "before" not in fields or "after" not in fields:
return (
f"Column {name} needs to have both 'before' and 'after' fields"
)
elif schema == "lax":
pass
else:
raise ValueError(f"Unknown schema {schema}")

errors = [_check_col(col_name) for col_name in diff_table.columns]
errors = [e for e in errors if e is not None]
if errors:
raise ValueError("\n".join(errors))

super().__init__(diff_table)

@classmethod
def from_tables(
cls,
before: ibis.Table,
after: ibis.Table,
*,
join_on: str,
schema: Literal["exactly", "names", "lax"] = "exactly",
) -> Updates:
"""Create from two different tables by joining them on a key.
Note that this results in only the rows that are present in both tables,
due to the inner join on the key. Insertions and deletions should be
handled separately.
"""
# Prefer a column order of
# 1. all the columns in after
# 2. any extra columns in before are tacked on the end
all_columns = (dict(after.schema()) | dict(after.schema())).keys()
joined = ibis.join(
before,
after,
how="inner",
lname="{name}_l",
rname="{name}_r",
predicates=join_on,
)
joined = _ensure_suffixed(before.columns, after.columns, joined)

def make_diff_col(col: str) -> ir.StructColumn:
d = {}
col_l = col + "_l"
col_r = col + "_r"
if col_l in joined.columns:
d["before"] = joined[col_l]
if col_r in joined.columns:
d["after"] = joined[col_r]
assert d, f"Column {col} not found in either before or after"
return ibis.struct(d).name(col)

diff_table = joined.select(*[make_diff_col(c) for c in all_columns])
return cls(diff_table, schema=schema)

def before(self) -> ibis.Table:
"""The table before the changes."""
return self.select(
[
self[col].before.name(col)
for col in self.columns
if "before" in self[col].type().names
]
)

def after(self) -> ibis.Table:
"""The table after the changes."""
return self.select(
[
self[col].after.name(col)
for col in self.columns
if "after" in self[col].type().names
]
)

def filter(self, *args, **kwargs):
return self.__class__(self._t.filter(*args, **kwargs), schema="lax")

def cache(self):
return self.__class__(self._t.cache(), schema="lax")

def as_row_update_dicts(
self, chunk_size: int = 1000000
) -> Iterable[dict[str, FieldUpdateDict]]:
"""Iterate through how every row changed."""
for batch in self.to_pyarrow_batches(chunk_size=chunk_size):
yield from batch.to_pylist()


def _ensure_suffixed(
original_left_cols: Iterable[str], original_right_cols: Iterable[str], t: ir.Table
) -> ir.Table:
"""Ensure that all columns in `t` have a "_l" or "_r" suffix."""
lc = set(original_left_cols)
rc = set(original_right_cols)
just_left = lc - rc
just_right = rc - lc
m = {c + "_l": c for c in just_left} | {c + "_r": c for c in just_right}
t = t.rename(m)

# If the condition is an equality condition, like `left.name == right.name`,
# then since we are doing an inner join ibis doesn't add suffixes to these
# columns. So we need duplicate these columns and add suffixes.
un_suffixed = [
c for c in t.columns if not c.endswith("_l") and not c.endswith("_r")
]
m = {c + "_l": ibis._[c] for c in un_suffixed} | {
c + "_r": ibis._[c] for c in un_suffixed
}
t = t.mutate(**m).drop(*un_suffixed)
return t


def _columns_in_both(t: ibis.Table) -> tuple[str]:
"""The columns that were in both the original and new table."""
return tuple(
name
for name, typ in t.schema().items()
if "before" in typ.names and "after" in typ.names
)

0 comments on commit 72e4635

Please sign in to comment.