Skip to content

Commit

Permalink
[DOP-22348] Add transformations for Transfers with dataframe row filt…
Browse files Browse the repository at this point in the history
…ering
  • Loading branch information
Ilyas Gasanov committed Jan 16, 2025
1 parent 9b79921 commit f65489b
Show file tree
Hide file tree
Showing 41 changed files with 1,187 additions and 132 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/184.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add transformations for **Transfers** with dataframe row filtering
240 changes: 237 additions & 3 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pytest-asyncio = "^0.25.1"
pytest-randomly = "^3.15.0"
pytest-deadfixtures = "^2.2.1"
pytest-mock = "^3.14.0"
pytest-lazy-fixtures = "^1.1.1"
onetl = {extras = ["spark", "s3", "hdfs"], version = "^0.12.0"}
faker = "^33.3.0"
coverage = "^7.6.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def upgrade():
sa.Column("strategy_params", sa.JSON(), nullable=False),
sa.Column("source_params", sa.JSON(), nullable=False),
sa.Column("target_params", sa.JSON(), nullable=False),
sa.Column("transformations", sa.JSON(), nullable=False),
sa.Column("is_scheduled", sa.Boolean(), nullable=False),
sa.Column("schedule", sa.String(length=32), nullable=False),
sa.Column("queue_id", sa.BigInteger(), nullable=False),
Expand Down
1 change: 1 addition & 0 deletions syncmaster/db/models/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Transfer(
strategy_params: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
source_params: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
target_params: Mapped[dict[str, Any]] = mapped_column(JSON, nullable=False, default={})
transformations: Mapped[list[dict[str, Any]]] = mapped_column(JSON, nullable=False, default=list)
is_scheduled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
schedule: Mapped[str] = mapped_column(String(32), nullable=False, default="")
queue_id: Mapped[int] = mapped_column(
Expand Down
22 changes: 13 additions & 9 deletions syncmaster/db/repositories/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ async def create(
source_params: dict[str, Any],
target_params: dict[str, Any],
strategy_params: dict[str, Any],
transformations: list[dict[str, Any]],
queue_id: int,
is_scheduled: bool,
schedule: str | None,
Expand All @@ -130,6 +131,7 @@ async def create(
source_params=source_params,
target_params=target_params,
strategy_params=strategy_params,
transformations=transformations,
queue_id=queue_id,
is_scheduled=is_scheduled,
schedule=schedule or "",
Expand All @@ -154,20 +156,21 @@ async def update(
source_params: dict[str, Any],
target_params: dict[str, Any],
strategy_params: dict[str, Any],
transformations: list[dict[str, Any]],
is_scheduled: bool | None,
schedule: str | None,
new_queue_id: int | None,
) -> Transfer:
try:
for key in transfer.source_params:
if key not in source_params or source_params[key] is None:
source_params[key] = transfer.source_params[key]
for key in transfer.target_params:
if key not in target_params or target_params[key] is None:
target_params[key] = transfer.target_params[key]
for key in transfer.strategy_params:
if key not in strategy_params or strategy_params[key] is None:
strategy_params[key] = transfer.strategy_params[key]
for old, new in [
(transfer.source_params, source_params),
(transfer.target_params, target_params),
(transfer.strategy_params, strategy_params),
]:
for key in old:
if key not in new or new[key] is None:
new[key] = old[key]

return await self._update(
Transfer.id == transfer.id,
name=name or transfer.name,
Expand All @@ -179,6 +182,7 @@ async def update(
target_connection_id=target_connection_id or transfer.target_connection_id,
source_params=source_params,
target_params=target_params,
transformations=transformations or transfer.transformations,
queue_id=new_queue_id or transfer.queue_id,
)
except IntegrityError as e:
Expand Down
2 changes: 2 additions & 0 deletions syncmaster/dto/transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TransferDTO:
@dataclass
class DBTransferDTO(TransferDTO):
table_name: str
transformations: list[dict] | None = None


@dataclass
Expand All @@ -23,6 +24,7 @@ class FileTransferDTO(TransferDTO):
file_format: CSV | JSONLine | JSON | Excel | XML | ORC | Parquet
options: dict
df_schema: dict | None = None
transformations: list[dict] | None = None

_format_parsers = {
"csv": CSV,
Expand Down
4 changes: 2 additions & 2 deletions syncmaster/schemas/v1/connections/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class CreateOracleConnectionDataSchema(BaseModel):
additional_params: dict = Field(default_factory=dict)

@model_validator(mode="before")
def check_owner_id(cls, values):
def validate_connection_identifiers(cls, values):
sid, service_name = values.get("sid"), values.get("service_name")
if sid and service_name:
raise ValueError("You must specify either sid or service_name but not both")
Expand All @@ -47,7 +47,7 @@ class UpdateOracleConnectionDataSchema(BaseModel):
additional_params: dict | None = Field(default_factory=dict)

@model_validator(mode="before")
def check_owner_id(cls, values):
def validate_connection_identifiers(cls, values):
sid, service_name = values.get("sid"), values.get("service_name")
if sid and service_name:
raise ValueError("You must specify either sid or service_name but not both")
Expand Down
18 changes: 16 additions & 2 deletions syncmaster/schemas/v1/transfers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from pydantic import BaseModel, Field, model_validator
from typing import Annotated

from pydantic import BaseModel, Field, field_validator, model_validator

from syncmaster.schemas.v1.connections.connection import ReadConnectionSchema
from syncmaster.schemas.v1.page import PageSchema
Expand All @@ -27,6 +29,9 @@
S3ReadTransferTarget,
)
from syncmaster.schemas.v1.transfers.strategy import FullStrategy, IncrementalStrategy
from syncmaster.schemas.v1.transfers.transformations.dataframe_rows_filter import (
DataframeRowsFilter,
)
from syncmaster.schemas.v1.types import NameConstr

ReadTransferSchemaSource = (
Expand Down Expand Up @@ -97,6 +102,8 @@
| None
)

TransformationSchema = DataframeRowsFilter


class CopyTransferSchema(BaseModel):
new_group_id: int
Expand Down Expand Up @@ -129,6 +136,9 @@ class ReadTransferSchema(BaseModel):
...,
discriminator="type",
)
transformations: list[Annotated[TransformationSchema, Field(..., discriminator="type")]] = Field(
default_factory=list,
)

class Config:
from_attributes = True
Expand Down Expand Up @@ -158,9 +168,12 @@ class CreateTransferSchema(BaseModel):
discriminator="type",
description="Incremental or archive download options",
)
transformations: list[
Annotated[TransformationSchema, Field(None, discriminator="type", description="List of transformations")]
] = Field(default_factory=list)

@model_validator(mode="before")
def check_owner_id(cls, values):
def validate_scheduling(cls, values):
is_scheduled, schedule = values.get("is_scheduled"), values.get("schedule")
if is_scheduled and schedule is None:
# TODO make checking cron string
Expand All @@ -179,6 +192,7 @@ class UpdateTransferSchema(BaseModel):
source_params: UpdateTransferSchemaSource = Field(discriminator="type", default=None)
target_params: UpdateTransferSchemaTarget = Field(discriminator="type", default=None)
strategy_params: FullStrategy | IncrementalStrategy | None = Field(discriminator="type", default=None)
transformations: list[Annotated[TransformationSchema, Field(discriminator="type", default=None)]] = None


class ReadFullTransferSchema(ReadTransferSchema):
Expand Down
2 changes: 2 additions & 0 deletions syncmaster/schemas/v1/transfers/transformations/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Annotated, Literal

from pydantic import BaseModel, Field

from syncmaster.schemas.v1.transformation_types import DATAFRAME_ROWS_FILTER


class BaseRowFilter(BaseModel):
field: str


class IsNullFilter(BaseRowFilter):
type: Literal["is_null"]


class IsNotNullFilter(BaseRowFilter):
type: Literal["is_not_null"]


class EqualFilter(BaseRowFilter):
type: Literal["equal"]
value: str


class NotEqualFilter(BaseRowFilter):
type: Literal["not_equal"]
value: str


class GreaterThanFilter(BaseRowFilter):
type: Literal["greater_than"]
value: str


class GreaterOrEqualFilter(BaseRowFilter):
type: Literal["greater_or_equal"]
value: str


class LessThanFilter(BaseRowFilter):
type: Literal["less_than"]
value: str


class LessOrEqualFilter(BaseRowFilter):
type: Literal["less_or_equal"]
value: str


class LikeFilter(BaseRowFilter):
type: Literal["like"]
value: str


class ILikeFilter(BaseRowFilter):
type: Literal["ilike"]
value: str


class NotLikeFilter(BaseRowFilter):
type: Literal["not_like"]
value: str


class NotILikeFilter(BaseRowFilter):
type: Literal["not_ilike"]
value: str


class RegexpFilter(BaseRowFilter):
type: Literal["regexp"]
value: str


RowFilter = (
IsNullFilter
| IsNotNullFilter
| EqualFilter
| NotEqualFilter
| GreaterThanFilter
| GreaterOrEqualFilter
| LessThanFilter
| LessOrEqualFilter
| LikeFilter
| ILikeFilter
| NotLikeFilter
| NotILikeFilter
| RegexpFilter
)


class DataframeRowsFilter(BaseModel):
type: DATAFRAME_ROWS_FILTER
filters: list[Annotated[RowFilter, Field(..., discriminator="type")]] = Field(default_factory=list)
5 changes: 5 additions & 0 deletions syncmaster/schemas/v1/transformation_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Literal

DATAFRAME_ROWS_FILTER = Literal["dataframe_rows_filter"]
4 changes: 4 additions & 0 deletions syncmaster/server/api/v1/transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ async def create_transfer(
source_params=transfer_data.source_params.dict(),
target_params=transfer_data.target_params.dict(),
strategy_params=transfer_data.strategy_params.dict(),
transformations=[tr.dict() for tr in transfer_data.transformations],
queue_id=transfer_data.queue_id,
is_scheduled=transfer_data.is_scheduled,
schedule=transfer_data.schedule,
Expand Down Expand Up @@ -326,6 +327,9 @@ async def update_transfer(
source_params=transfer_data.source_params.dict() if transfer_data.source_params else {},
target_params=transfer_data.target_params.dict() if transfer_data.target_params else {},
strategy_params=transfer_data.strategy_params.dict() if transfer_data.strategy_params else {},
transformations=(
[tr.dict() for tr in transfer_data.transformations] if transfer_data.transformations else []
),
is_scheduled=transfer_data.is_scheduled,
schedule=transfer_data.schedule,
new_queue_id=transfer_data.new_queue_id,
Expand Down
5 changes: 4 additions & 1 deletion syncmaster/worker/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,13 @@ def __init__(
self.source_handler = self.get_handler(
connection_data=source_connection.data,
transfer_params=run.transfer.source_params,
transformations=run.transfer.transformations,
connection_auth_data=source_auth_data,
)
self.target_handler = self.get_handler(
connection_data=target_connection.data,
transfer_params=run.transfer.target_params,
transformations=run.transfer.transformations,
connection_auth_data=target_auth_data,
)

Expand All @@ -126,6 +128,7 @@ def get_handler(
connection_data: dict[str, Any],
connection_auth_data: dict,
transfer_params: dict[str, Any],
transformations: list[dict],
) -> Handler:
connection_data.update(connection_auth_data)
connection_data.pop("type")
Expand All @@ -138,5 +141,5 @@ def get_handler(

return handler(
connection_dto=connection_dto(**connection_data),
transfer_dto=transfer_dto(**transfer_params),
transfer_dto=transfer_dto(**transfer_params, transformations=transformations),
)
31 changes: 29 additions & 2 deletions syncmaster/worker/handlers/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,26 @@
class DBHandler(Handler):
connection: BaseDBConnection
transfer_dto: DBTransferDTO
_operators = {
"is_null": "IS NULL",
"is_not_null": "IS NOT NULL",
"equal": "=",
"not_equal": "!=",
"greater_than": ">",
"greater_or_equal": ">=",
"less_than": "<",
"less_or_equal": "<=",
"like": "LIKE",
"ilike": "ILIKE",
"not_like": "NOT LIKE",
"not_ilike": "NOT ILIKE",
}

def read(self) -> DataFrame:
reader = DBReader(
connection=self.connection,
table=self.transfer_dto.table_name,
where=self._get_filter_expression(),
)
return reader.run()

Expand All @@ -32,7 +47,19 @@ def write(self, df: DataFrame) -> None:
connection=self.connection,
table=self.transfer_dto.table_name,
)
return writer.run(df=self.normalize_column_names(df))
return writer.run(df=self._normalize_column_names(df))

@abstractmethod
def normalize_column_names(self, df: DataFrame) -> DataFrame: ...
def _normalize_column_names(self, df: DataFrame) -> DataFrame: ...

@abstractmethod
def _make_filter_expression(self, filters: list[dict]) -> str | None: ...

def _get_filter_expression(self) -> str | None:
filters = []
for transformation in self.transfer_dto.transformations:
if transformation["type"] == "dataframe_rows_filter":
filters.extend(transformation["filters"])
if filters:
return self._make_filter_expression(filters)
return None
Loading

0 comments on commit f65489b

Please sign in to comment.