Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
# WIP: 0.15.0

## Important new stuff:

### MetaPlane

See [design-docs/2025-12-meta-plane.md](meta-plane.md) for motivation

* Introduced `MetaPlane`/`TableMeta`/`TransformMeta` interfaces to decouple
metadata management from the compute plane
* Added SQL reference implementation (`SQLMetaPlane`, `SQLTableMeta`,
`SQLTransformMeta`) and rewired `DataStore`, `DataTable`, and batch transform
steps to consume the new meta plane API
* Added meta-plane design doc and removed legacy `MetaTable` plumbing in lints,
migrations, and tests

### InputSpec and key mapping

See [design-docs/2025-12-key-mapping.md](key-mapping.md) for motivation

* Renamed `JoinSpec` to `InputSpec`
* Added `keys` parameter to `InputSpec` and `ComputeInput` to support
joining tables with different key names
* Added `DataField` accessor for `InputSpec.keys`

## CLI improvements:
* Make CLI accept multiple `--name` values

# 0.14.6
Expand Down
51 changes: 48 additions & 3 deletions datapipe/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
from typing import Dict, Iterable, List, Literal, Optional, Sequence, Tuple, Union

from opentelemetry import trace
from sqlalchemy import Column

from datapipe.datatable import DataStore, DataTable
from datapipe.executor import Executor, ExecutorConfig
from datapipe.run_config import RunConfig
from datapipe.store.database import TableStoreDB
from datapipe.store.table_store import TableStore
from datapipe.types import ChangeList, IndexDF, Labels, TableOrName
from datapipe.types import ChangeList, DataField, FieldAccessor, IndexDF, Labels, MetaSchema, TableOrName

logger = logging.getLogger("datapipe.compute")
tracer = trace.get_tracer("datapipe.compute")
Expand Down Expand Up @@ -86,6 +87,51 @@ class ComputeInput:
dt: DataTable
join_type: Literal["inner", "full"] = "full"

# If provided, this dict tells how to get key columns from meta and data tables
#
# Example: {"idx_col": DataField("data_col")} means that to get idx_col value
# we should read data_col from data table
#
# Example: {"idx_col": "meta_col"} means that to get idx_col value
# we should read meta_col from meta table
keys: Optional[Dict[str, FieldAccessor]] = None

@property
def primary_keys(self) -> List[str]:
if self.keys:
return list(self.keys.keys())
else:
return self.dt.primary_keys

@property
def primary_schema(self) -> MetaSchema:
if self.keys:
primary_schema_dict = {col.name: col for col in self.dt.primary_schema}
data_schema_dict = {col.name: col for col in self.dt.table_store.get_schema()}

schema = []
for k, accessor in self.keys.items():
if isinstance(accessor, str):
source_column = primary_schema_dict[accessor]
column_alias = k
elif isinstance(accessor, DataField):
source_column = data_schema_dict[accessor.field_name]
column_alias = k
schema.append(data_schema_dict[accessor.field_name])
else:
raise ValueError(f"Unknown accessor type: {type(accessor)}")

schema.append(
Column(
column_alias,
source_column.type,
primary_key=source_column.primary_key,
)
)
return schema
else:
return self.dt.primary_schema


class ComputeStep:
"""
Expand Down Expand Up @@ -114,8 +160,7 @@ def __init__(
self._name = name
# Нормализация input_dts: автоматически оборачиваем DataTable в ComputeInput
self.input_dts = [
inp if isinstance(inp, ComputeInput) else ComputeInput(dt=inp, join_type="full")
for inp in input_dts
inp if isinstance(inp, ComputeInput) else ComputeInput(dt=inp, join_type="full") for inp in input_dts
]
self.output_dts = output_dts
self._labels = labels
Expand Down
73 changes: 69 additions & 4 deletions datapipe/meta/base.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterable, Iterator, List, Literal, Optional, Sequence, Tuple
from typing import TYPE_CHECKING, Dict, Iterable, Iterator, List, Literal, Optional, Sequence, Tuple

import pandas as pd
from sqlalchemy import Column

from datapipe.run_config import RunConfig
from datapipe.types import ChangeList, DataSchema, HashDF, IndexDF, MetadataDF, MetaSchema
from datapipe.types import ChangeList, DataSchema, FieldAccessor, HashDF, IndexDF, MetadataDF, MetaSchema

if TYPE_CHECKING:
from datapipe.compute import ComputeInput
Expand Down Expand Up @@ -166,10 +169,72 @@ def reset_metadata(
# with self.meta_dbconn.con.begin() as con:
# con.execute(self.meta.sql_table.update().values(process_ts=0, update_ts=0))

def transform_idx_to_table_idx(
self,
transform_idx: IndexDF,
keys: Optional[Dict[str, FieldAccessor]] = None,
) -> IndexDF:
"""
Given an index dataframe with transform keys, return an index dataframe
with table keys, applying `keys` aliasing if provided.

* `keys` is a mapping from table key to transform key
"""

if keys is None:
return transform_idx

table_key_cols: Dict[str, pd.Series] = {}
for transform_col in transform_idx.columns:
accessor = keys.get(transform_col) if keys is not None else transform_col
if isinstance(accessor, str):
table_key_cols[accessor] = transform_idx[transform_col]
else:
pass # skip non-meta fields

return IndexDF(pd.DataFrame(table_key_cols))


class TransformMeta:
primary_schema: DataSchema
primary_keys: List[str]
transform_keys_schema: DataSchema
transform_keys: List[str]

@classmethod
def compute_transform_schema(
cls,
input_cis: Sequence["ComputeInput"],
output_dts: Sequence["DataTable"],
transform_keys: Optional[List[str]],
) -> Tuple[List[str], MetaSchema]:
# Hacky way to collect all the primary keys into a single set. Possible
# problem that is not handled here is that theres a possibility that the
# same key is defined differently in different input tables.
all_keys: Dict[str, Column] = {}

for ci in input_cis:
all_keys.update({col.name: col for col in ci.primary_schema})

for dt in output_dts:
all_keys.update({col.name: col for col in dt.primary_schema})

if transform_keys is not None:
return (transform_keys, [all_keys[k] for k in transform_keys])

assert len(input_cis) > 0, "At least one input table is required to infer transform keys"

inp_p_keys = set.intersection(*[set(inp.primary_keys) for inp in input_cis])
assert len(inp_p_keys) > 0

if len(output_dts) == 0:
return (list(inp_p_keys), [all_keys[k] for k in inp_p_keys])

out_p_keys = set.intersection(*[set(out.primary_keys) for out in output_dts])
assert len(out_p_keys) > 0

inp_out_p_keys = set.intersection(inp_p_keys, out_p_keys)
assert len(inp_out_p_keys) > 0

return (list(inp_out_p_keys), [all_keys[k] for k in inp_out_p_keys])

def get_changed_idx_count(
self,
Expand Down
Loading