Skip to content

Commit

Permalink
[DOP-22350] Add transformations for Transfers with dataframe column f…
Browse files Browse the repository at this point in the history
…iltering
  • Loading branch information
Ilyas Gasanov committed Jan 20, 2025
1 parent 6a47361 commit 69b5dd0
Show file tree
Hide file tree
Showing 23 changed files with 357 additions and 110 deletions.
1 change: 1 addition & 0 deletions docs/changelog/next_release/186.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add transformations for **Transfers** with dataframe column filtering
5 changes: 4 additions & 1 deletion syncmaster/schemas/v1/transfers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
S3ReadTransferTarget,
)
from syncmaster.schemas.v1.transfers.strategy import FullStrategy, IncrementalStrategy
from syncmaster.schemas.v1.transfers.transformations.dataframe_columns_filter import (
DataframeColumnsFilter,
)
from syncmaster.schemas.v1.transfers.transformations.dataframe_rows_filter import (
DataframeRowsFilter,
)
Expand Down Expand Up @@ -102,7 +105,7 @@
| None
)

TransformationSchema = DataframeRowsFilter
TransformationSchema = DataframeRowsFilter | DataframeColumnsFilter


class CopyTransferSchema(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# SPDX-FileCopyrightText: 2023-2024 MTS PJSC
# SPDX-License-Identifier: Apache-2.0
from typing import Annotated, Literal

from pydantic import BaseModel, Field

from syncmaster.schemas.v1.transformation_types import DATAFRAME_COLUMNS_FILTER


class BaseColumnsFilter(BaseModel):
field: str


class IncludeFilter(BaseColumnsFilter):
type: Literal["include"]


class RenameFilter(BaseColumnsFilter):
type: Literal["rename"]
to: str


class CastFilter(BaseColumnsFilter):
type: Literal["cast"]
as_type: str


ColumnsFilter = IncludeFilter | RenameFilter | CastFilter


class DataframeColumnsFilter(BaseModel):
type: DATAFRAME_COLUMNS_FILTER
filters: list[Annotated[ColumnsFilter, Field(..., discriminator="type")]] = Field(default_factory=list)
Original file line number Diff line number Diff line change
Expand Up @@ -7,74 +7,74 @@
from syncmaster.schemas.v1.transformation_types import DATAFRAME_ROWS_FILTER


class BaseRowFilter(BaseModel):
class BaseRowsFilter(BaseModel):
field: str


class IsNullFilter(BaseRowFilter):
class IsNullFilter(BaseRowsFilter):
type: Literal["is_null"]


class IsNotNullFilter(BaseRowFilter):
class IsNotNullFilter(BaseRowsFilter):
type: Literal["is_not_null"]


class EqualFilter(BaseRowFilter):
class EqualFilter(BaseRowsFilter):
type: Literal["equal"]
value: str


class NotEqualFilter(BaseRowFilter):
class NotEqualFilter(BaseRowsFilter):
type: Literal["not_equal"]
value: str


class GreaterThanFilter(BaseRowFilter):
class GreaterThanFilter(BaseRowsFilter):
type: Literal["greater_than"]
value: str


class GreaterOrEqualFilter(BaseRowFilter):
class GreaterOrEqualFilter(BaseRowsFilter):
type: Literal["greater_or_equal"]
value: str


class LessThanFilter(BaseRowFilter):
class LessThanFilter(BaseRowsFilter):
type: Literal["less_than"]
value: str


class LessOrEqualFilter(BaseRowFilter):
class LessOrEqualFilter(BaseRowsFilter):
type: Literal["less_or_equal"]
value: str


class LikeFilter(BaseRowFilter):
class LikeFilter(BaseRowsFilter):
type: Literal["like"]
value: str


class ILikeFilter(BaseRowFilter):
class ILikeFilter(BaseRowsFilter):
type: Literal["ilike"]
value: str


class NotLikeFilter(BaseRowFilter):
class NotLikeFilter(BaseRowsFilter):
type: Literal["not_like"]
value: str


class NotILikeFilter(BaseRowFilter):
class NotILikeFilter(BaseRowsFilter):
type: Literal["not_ilike"]
value: str


class RegexpFilter(BaseRowFilter):
class RegexpFilter(BaseRowsFilter):
type: Literal["regexp"]
value: str


RowFilter = (
RowsFilter = (
IsNullFilter
| IsNotNullFilter
| EqualFilter
Expand All @@ -93,4 +93,4 @@ class RegexpFilter(BaseRowFilter):

class DataframeRowsFilter(BaseModel):
type: DATAFRAME_ROWS_FILTER
filters: list[Annotated[RowFilter, Field(..., discriminator="type")]] = Field(default_factory=list)
filters: list[Annotated[RowsFilter, Field(..., discriminator="type")]] = Field(default_factory=list)
1 change: 1 addition & 0 deletions syncmaster/schemas/v1/transformation_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from typing import Literal

DATAFRAME_ROWS_FILTER = Literal["dataframe_rows_filter"]
DATAFRAME_COLUMNS_FILTER = Literal["dataframe_columns_filter"]
43 changes: 35 additions & 8 deletions syncmaster/worker/handlers/db/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def read(self) -> DataFrame:
reader = DBReader(
connection=self.connection,
table=self.transfer_dto.table_name,
where=self._get_filter_expression(),
where=self._get_expression(transformation_type="dataframe_rows_filter"),
columns=self._get_expression(transformation_type="dataframe_columns_filter"),
)
return reader.run()

Expand All @@ -53,13 +54,39 @@ def write(self, df: DataFrame) -> None:
def _normalize_column_names(self, df: DataFrame) -> DataFrame: ...

@abstractmethod
def _make_filter_expression(self, filters: list[dict]) -> str | None: ...
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None: ...

def _get_filter_expression(self) -> str | None:
filters = []
def _make_columns_filter_expression(self, filters: list[dict]) -> list[str] | None:
expressions = []
for filter in filters:
filter_type = filter["type"]
field = self._make_field_brackets(filter["field"])

if filter_type == "include":
expressions.append(field)
elif filter_type == "rename":
new_name = self._make_field_brackets(filter["to"])
expressions.append(f"{field} AS {new_name}")
elif filter_type == "cast":
cast_type = filter["as_type"]
expressions.append(f"CAST({field} AS {cast_type}) AS {field}")

return expressions or None

@staticmethod
def _make_field_brackets(field: str) -> str:
return f'"{field}"'

def _get_expression(self, transformation_type: str) -> str | None:
expressions = []
for transformation in self.transfer_dto.transformations:
if transformation["type"] == "dataframe_rows_filter":
filters.extend(transformation["filters"])
if filters:
return self._make_filter_expression(filters)
if transformation["type"] == transformation_type:
expressions.extend(transformation["filters"])

if expressions:
if transformation_type == "dataframe_rows_filter":
return self._make_rows_filter_expression(expressions)
elif transformation_type == "dataframe_columns_filter":
return self._make_columns_filter_expression(expressions)

return None
4 changes: 2 additions & 2 deletions syncmaster/worker/handlers/db/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.lower())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
field = f'"{filter["field"]}"'
field = self._make_field_brackets(filter["field"])
op = self._operators[filter["type"]]
value = filter.get("value")

Expand Down
8 changes: 6 additions & 2 deletions syncmaster/worker/handlers/db/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.lower())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
op = self._operators[filter["type"]]
field = f"`{filter["field"]}`"
field = self._make_field_brackets(filter["field"])
value = filter.get("value")

if value is None:
Expand All @@ -59,3 +59,7 @@ def _make_filter_expression(self, filters: list[dict]) -> str | None:
expressions.append(f"{field} {op} '{value}'")

return " AND ".join(expressions) or None

@staticmethod
def _make_field_brackets(field):
return f"`{field}`"
4 changes: 2 additions & 2 deletions syncmaster/worker/handlers/db/mssql.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.lower())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
op = self._operators[filter["type"]]
field = f'"{filter["field"]}"'
field = self._make_field_brackets(filter["field"])
value = filter.get("value")

if value is None:
Expand Down
8 changes: 6 additions & 2 deletions syncmaster/worker/handlers/db/mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.lower())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
op = self._operators[filter["type"]]
field = f"`{filter["field"]}`"
field = self._make_field_brackets(filter["field"])
value = filter.get("value")

if value is None:
Expand All @@ -59,3 +59,7 @@ def _make_filter_expression(self, filters: list[dict]) -> str | None:
expressions.append(f"{field} {op} '{value}'")

return " AND ".join(expressions) or None

@staticmethod
def _make_field_brackets(field: str) -> str:
return f"`{field}`"
4 changes: 2 additions & 2 deletions syncmaster/worker/handlers/db/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.upper())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
field = f'"{filter["field"]}"'
field = self._make_field_brackets(filter["field"])
op = self._operators[filter["type"]]
value = filter.get("value")

Expand Down
4 changes: 2 additions & 2 deletions syncmaster/worker/handlers/db/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def _normalize_column_names(self, df: DataFrame) -> DataFrame:
df = df.withColumnRenamed(column_name, column_name.lower())
return df

def _make_filter_expression(self, filters: list[dict]) -> str | None:
def _make_rows_filter_expression(self, filters: list[dict]) -> str | None:
expressions = []
for filter in filters:
field = f'"{filter["field"]}"'
field = self._make_field_brackets(filter["field"])
op = self._operators[filter["type"]]
value = filter.get("value")

Expand Down
46 changes: 36 additions & 10 deletions syncmaster/worker/handlers/file/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ def read(self) -> DataFrame:
)
df = reader.run()

filter_expression = self._get_filter_expression()
if filter_expression:
df = df.where(filter_expression)
row_filter_expression = self._get_expression(transformation_type="dataframe_rows_filter")
if row_filter_expression:
df = df.where(row_filter_expression)

column_filter_expression = self._get_expression(transformation_type="dataframe_columns_filter")
if column_filter_expression:
df = df.selectExpr(*column_filter_expression)

return df

Expand All @@ -64,16 +68,21 @@ def write(self, df: DataFrame):

return writer.run(df=df)

def _get_filter_expression(self) -> str | None:
filters = []
def _get_expression(self, transformation_type: str) -> str | None:
expressions = []
for transformation in self.transfer_dto.transformations:
if transformation["type"] == "dataframe_rows_filter":
filters.extend(transformation["filters"])
if filters:
return self._make_filter_expression(filters)
if transformation["type"] == transformation_type:
expressions.extend(transformation["filters"])

if expressions:
if transformation_type == "dataframe_rows_filter":
return self._make_rows_filter_expression(expressions)
elif transformation_type == "dataframe_columns_filter":
return self._make_columns_filter_expression(expressions)

return None

def _make_filter_expression(self, filters: list[dict]) -> str:
def _make_rows_filter_expression(self, filters: list[dict]) -> str:
expressions = []
for filter in filters:
field = filter["field"]
Expand All @@ -83,3 +92,20 @@ def _make_filter_expression(self, filters: list[dict]) -> str:
expressions.append(f"{field} {op} '{value}'" if value is not None else f"{field} {op}")

return " AND ".join(expressions)

def _make_columns_filter_expression(self, filters: list[dict]) -> list[str] | None:
expressions = []
for filter in filters:
filter_type = filter["type"]
field = filter["field"]

if filter_type == "include":
expressions.append(field)
elif filter_type == "rename":
new_name = filter["to"]
expressions.append(f"{field} AS {new_name}")
elif filter_type == "cast":
cast_type = filter["as_type"]
expressions.append(f"CAST({field} AS {cast_type}) AS {field}")

return expressions or None
10 changes: 7 additions & 3 deletions syncmaster/worker/handlers/file/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,12 @@ def read(self) -> DataFrame:
)
df = reader.run()

filter_expression = self._get_filter_expression()
if filter_expression:
df = df.where(filter_expression)
row_filter_expression = self._get_expression(transformation_type="dataframe_rows_filter")
if row_filter_expression:
df = df.where(row_filter_expression)

column_filter_expression = self._get_expression(transformation_type="dataframe_columns_filter")
if column_filter_expression:
df = df.selectExpr(*column_filter_expression)

return df
Loading

0 comments on commit 69b5dd0

Please sign in to comment.