Skip to content

Commit 71d2b6e

Browse files
ikanashovIvan Kanashov
and
Ivan Kanashov
authored
MG-3470 Update to Airflow 2.4.1 and Pandas 1.5.0 (#61)
* feat: try to upgrade to airflow 2.4 * fix: install package by pip because python-poetry/poetry#1214 * refactor: update docker images * refactor: update aws connections * feat: add XCOM_WORK_KEY_PREFIX constant * fix: copy_from don't work with schema.table for psycopg > 2.9 psycopg/psycopg2#1294 * refactor: use DagRunState class instead of str * fix: use right TaskInstance * feat: use new Xcom logic * refactor: use schedule instead of schedule_interval * refactor: remove create dagrun from fixture * feat: add create_dag_run to dag_generator tests * feat: add create_dag_run to operators tests * feat: updata pandas to 1.5.0 * fix: size of empty DataFrame changed * fix: ports in docker-compose after review * fix: down version to 2.1.0 Co-authored-by: Ivan Kanashov <i.kanashov@tinkoff.ru>
1 parent 5f89531 commit 71d2b6e

40 files changed

+2395
-769
lines changed

data-detective-airflow/Dockerfile

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Main docker building
22
ARG PYTHON_VERSION=${PYTHON_VERSION:-3.9}
3-
FROM python:${PYTHON_VERSION}-slim-buster as base
3+
FROM python:${PYTHON_VERSION}-slim-bullseye as base
44

55
ARG AIRFLOW_USER_UID=50000
66
ENV DEBIAN_FRONTEND=noninteractive \
@@ -22,10 +22,10 @@ ENV PYTHONPATH=$PATH:${AIRFLOW_HOME} \
2222

2323

2424
# Устанавливаем pip из более полного образа python-image (в основном - slim не достаточно утилит для pip install)
25-
FROM python:${PYTHON_VERSION}-buster as builder
25+
FROM python:${PYTHON_VERSION}-bullseye as builder
2626

2727
ARG AIRFLOW_USER_UID=50000
28-
ARG POETRY_VERSION=${POETRY_VERSION:-1.1.13}
28+
ARG POETRY_VERSION=${POETRY_VERSION:-1.2.2}
2929

3030
ENV AIRFLOW_HOME=/usr/local/airflow/ \
3131
AIRFLOW_USER_UID=${AIRFLOW_USER_UID}
@@ -43,9 +43,11 @@ ENV POETRY_VERSION=${POETRY_VERSION} \
4343

4444
WORKDIR ${AIRFLOW_HOME}
4545
COPY pyproject.toml poetry.lock ${AIRFLOW_HOME}/
46+
# poetry not work without virtualenv https://github.com/python-poetry/poetry/issues/1214
4647
RUN pip install --no-cache-dir "poetry==${POETRY_VERSION}" \
4748
&& poetry config virtualenvs.create false \
48-
&& poetry install --no-interaction --no-ansi
49+
&& poetry export -f requirements.txt --with dev --output requirements.txt \
50+
&& pip install -r requirements.txt
4951

5052

5153
FROM base as dev
@@ -60,7 +62,7 @@ WORKDIR ${AIRFLOW_HOME}
6062
USER root
6163
RUN echo "airflow ALL = NOPASSWD: /usr/sbin/sshd" >> /etc/sudoers.d/airflow \
6264
&& apt-get update --allow-releaseinfo-change\
63-
&& apt-get install -y openssh-server netcat make\
65+
&& apt-get install -y openssh-server netcat make libpq5\
6466
&& apt-get clean \
6567
&& mkdir /var/run/sshd \
6668
&& sed -i 's/\#PermitRootLogin prohibit-password/PermitRootLogin yes/' /etc/ssh/sshd_config \

data-detective-airflow/data_detective_airflow/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
WORK_S3_PREFIX: str = 'dd_airflow'
1414
WORK_FILE_PREFIX: str = 'wrk'
1515
WORK_PG_SCHEMA_PREFIX: str = 'wrk'
16+
XCOM_WORK_KEY_PREFIX = 'work_'
1617

1718
# DAG
1819
DEFAULT_START_DATE = datetime(2020, 4, 8)

data-detective-airflow/data_detective_airflow/dag_generator/dags/python_dag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ def __init__(self, dag_dir: str, config: None):
2525
dag_id=Path(dag_dir).name,
2626
factory=self.config['factory'],
2727
start_date=DEFAULT_START_DATE,
28-
schedule_interval=self.config['schedule_interval'],
28+
schedule=self.config['schedule_interval'],
2929
description=self.config.get('description', ''),
3030
default_args=self.config['default_args'],
3131
template_searchpath=dag_dir,

data-detective-airflow/data_detective_airflow/dag_generator/dags/tdag.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
# -*- coding: utf-8 -*-
2+
import re
23
from typing import Callable
34

45
from airflow import DAG
56
from airflow.models.xcom import XCom
67
from airflow.utils.context import Context
78
from airflow.utils.module_loading import import_string
89

9-
from data_detective_airflow.constants import PG_CONN_ID, S3_CONN_ID, SFTP_CONN_ID
10+
from data_detective_airflow.constants import PG_CONN_ID, S3_CONN_ID, SFTP_CONN_ID, XCOM_WORK_KEY_PREFIX
1011
from data_detective_airflow.dag_generator.results import PgResult, PickleResult
1112
from data_detective_airflow.dag_generator.results.base_result import BaseResult, ResultType
1213
from data_detective_airflow.dag_generator.works import FileWork, PgWork, S3Work, SFTPWork
@@ -93,11 +94,14 @@ def clear_all_works(self, context: Context):
9394
def get_all_works(self, context: Context):
9495
"""Clearing all work on completion of execution"""
9596
dag_id = self.dag_id
96-
execution_date = context['logical_date']
97-
xcoms = XCom.get_many(task_ids='work', dag_ids=dag_id, execution_date=execution_date)
97+
run_id = context['run_id']
98+
xcoms = [xcom for xcom in XCom.get_many(dag_ids=dag_id, run_id=run_id)
99+
if xcom.key.startswith(XCOM_WORK_KEY_PREFIX)]
98100
works = set()
99101
for xcom in xcoms:
100-
work = self.get_work(work_type=xcom.key.split('-')[0], work_conn_id=xcom.key.split('-')[1])
102+
work_key = re.sub(XCOM_WORK_KEY_PREFIX, '', xcom.key).split('-', 2)
103+
work_type, work_conn_id = work_key[0], work_key[1]
104+
work = self.get_work(work_type=work_type, work_conn_id=work_conn_id)
101105
work.set_params(params=dict(xcom.value))
102106
works.add(work)
103107

data-detective-airflow/data_detective_airflow/dag_generator/dags/yaml_dag.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ def __init__(self, dag_dir: str, config: dict[str, Any]):
2727
dag_id=Path(dag_dir).name,
2828
factory=self.config['factory'],
2929
start_date=constants.DEFAULT_START_DATE,
30-
schedule_interval=self.config.get('schedule_interval'),
30+
schedule=self.config.get('schedule_interval'),
3131
description=self.config.get('description', ''),
3232
default_args=self.config['default_args'],
3333
template_searchpath=dag_dir,

data-detective-airflow/data_detective_airflow/dag_generator/works/base_work.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
from collections.abc import Callable
1111
from enum import Enum
1212
from functools import wraps
13-
from typing import Any, Dict, Optional, Text, Union
13+
from typing import Any, Optional, Union
1414

1515
from airflow.models.xcom import XCom
1616
from airflow.utils.log.logging_mixin import LoggingMixin
1717

18-
from data_detective_airflow.constants import CLEAR_WORK_KEY, DAG_ID_KEY, TASK_ID_KEY
18+
from data_detective_airflow.constants import CLEAR_WORK_KEY, DAG_ID_KEY, TASK_ID_KEY, XCOM_WORK_KEY_PREFIX
1919

2020

2121
class WorkType(Enum):
@@ -57,8 +57,6 @@ class BaseWork(ABC, LoggingMixin):
5757
Note: each worker will have its own instance of this class created, so one worker will not know that a work has already been created in another worker
5858
"""
5959

60-
XCOM_TASK_KEY = 'work'
61-
6260
def __init__(self, dag, work_type: str, conn_id: str = None, **kwargs): # pylint: disable=super-init-not-called
6361
del kwargs
6462

@@ -78,7 +76,7 @@ def __eq__(self, other):
7876
def __ne__(self, other):
7977
return not self.__eq__(other)
8078

81-
def get_path(self, context: Dict, prefix: str = 'work_airflow'):
79+
def get_path(self, context: dict, prefix: str = 'work_airflow'):
8280
"""Return a unique work name for this context
8381
(for dag_run)
8482
Important note! The name of the work is uniquely determined by dag_run
@@ -119,13 +117,13 @@ def clear(self, context: dict):
119117
self._clear_logic(context)
120118
self._exists = False
121119

122-
def get_xcom_key(self, context: Dict):
120+
def get_xcom_key(self, context: dict):
123121
"""Return the key for XCom for the current work and task"""
124122
task = context.get('task')
125-
base_xcom_key = f'{self.work_type}-{self.conn_id}'
123+
base_xcom_key = f'{XCOM_WORK_KEY_PREFIX}{self.work_type}-{self.conn_id}'
126124
return f'{base_xcom_key}-{task.task_id}' if task else base_xcom_key
127125

128-
def get_xcom_params(self, context: Dict) -> Dict:
126+
def get_xcom_params(self, context: dict) -> dict:
129127
"""Serialize work to a dictionary for writing to XCom"""
130128
return {
131129
'key': self.get_xcom_key(context),
@@ -134,12 +132,12 @@ def get_xcom_params(self, context: Dict) -> Dict:
134132
'value': {
135133
'_exists': self._exists,
136134
},
137-
'execution_date': context['execution_date'],
138-
TASK_ID_KEY: BaseWork.XCOM_TASK_KEY,
135+
'run_id': context['run_id'],
136+
TASK_ID_KEY: context.get('task').task_id,
139137
DAG_ID_KEY: self.dag.dag_id,
140138
}
141139

142-
def _get_attrs_from_xcom(self, context: Dict) -> Dict:
140+
def _get_attrs_from_xcom(self, context: dict) -> dict:
143141
"""Вернуть из XCom атрибуты для текущего таска"""
144142
to_get = self.get_xcom_params(context)
145143
to_get.pop('value')
@@ -153,7 +151,7 @@ def set_params(self, params: Optional[dict[str, Any]] = None):
153151
setattr(self, key, value)
154152

155153
@abstractmethod
156-
def _create_logic(self, context: Dict):
154+
def _create_logic(self, context: dict):
157155
"""Create a work unique for the context.
158156
Important note! The inheritors need to make this method idempotent, i.e. the method can be called
159157
as many times as needed.
@@ -163,7 +161,7 @@ def _create_logic(self, context: Dict):
163161
"""
164162

165163
@abstractmethod
166-
def _clear_logic(self, context: Dict):
164+
def _clear_logic(self, context: dict):
167165
"""Deinitialization"""
168166

169167
@abstractmethod

data-detective-airflow/data_detective_airflow/test_utilities/airflow.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from airflow.models import DagRun, TaskInstance
55
from airflow.utils.context import Context
6+
from airflow.utils.state import DagRunState
67
from airflow.utils.types import DagRunType
78

89

@@ -18,7 +19,7 @@ def create_or_get_dagrun(dag, task) -> DagRun:
1819
return dag.create_dagrun(
1920
execution_date=datetime.now(timezone(offset=timedelta(hours=0))),
2021
run_type=DagRunType.MANUAL,
21-
state='queued',
22+
state=DagRunState.QUEUED,
2223
data_interval=(
2324
datetime.now(timezone(offset=timedelta(hours=0))),
2425
datetime.now(timezone(offset=timedelta(hours=0))) + timedelta(hours=1)

data-detective-airflow/data_detective_airflow/test_utilities/generate_df.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ def fill_table_from_dataframe(conn: psycopg2.extensions.connection, dframe: Data
7272
logging.info(f'Run sql: {trunc_cmd}')
7373
cur.execute(trunc_cmd)
7474
logging.info('Fill data')
75-
cur.copy_from(file=buffer, table=full_table_name, sep=sep)
75+
cur.copy_expert(f"COPY {full_table_name} FROM STDIN DELIMITER '{sep}';", file=buffer)
7676
conn.commit()
7777
return True
7878
except psycopg2.DatabaseError as err:

data-detective-airflow/data_detective_airflow/utils/logging_thread.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
class LoggingThread(LoggingMixin):
99
def __init__(self, context, interval=300):
10-
super().__init__(context=context)
10+
super().__init__(context=context['ti'])
1111
self.interval = interval
1212
self.dag_id = context['dag'].dag_id
1313
self.task_id = context['task'].task_id

data-detective-airflow/docker-compose.CeleryExecutor.yml

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ x-airflow: &airflow
99
- AIRFLOW__CELERY__WORKER_AUTOSCALE=16,12
1010
- AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/1
1111
- AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://${META_USER}:${META_PASS}@metadb:5432/${META_USER}
12-
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${META_USER}:${META_PASS}@metadb:5432/${META_USER}
12+
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${META_USER}:${META_PASS}@metadb:5432/${META_USER}
1313
- AIRFLOW__WEBSERVER__SECRET_KEY=${SECRET_KEY}
14-
- AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
14+
- AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False
1515
- AIRFLOW__CORE__LOAD_EXAMPLES=False
1616

1717
services:
@@ -54,7 +54,7 @@ services:
5454
ports:
5555
- "5005:5432"
5656
volumes:
57-
- ./docker/init-pgwork.sql:/docker-entrypoint-initdb.d/init-pgwork.sql
57+
- ./docker/init-pg.sql:/docker-entrypoint-initdb.d/init-pg.sql
5858
healthcheck:
5959
test: ["CMD-SHELL", "pg_isready -U airflow"]
6060
interval: 2s
@@ -63,14 +63,16 @@ services:
6363
restart: always
6464

6565
s3:
66-
image: localstack/localstack:0.11.5
66+
image: localstack/localstack:1.0.4
6767
ports:
68-
- 4566:4566
69-
- 8055:8080
68+
- "4566:4566"
69+
- "8055:8080"
7070
environment:
71+
- EAGER_SERVICE_LOADING=1
7172
- SERVICES=s3
7273
- DEBUG=0
73-
- DATA_DIR=/tmp/localstack/data
74+
# Disable LocalStack Event Analytics
75+
- DISABLE_EVENTS=1
7476
volumes:
7577
- ./docker/init-s3.sh:/docker-entrypoint-initaws.d/init-s3.sh
7678
healthcheck:
@@ -91,11 +93,11 @@ services:
9193
condition: service_healthy
9294
volumes:
9395
- ./dags:${AIRFLOW_HOME}/dags
94-
- ./data_detectivedata_detectivedata_detectivedata_detective_airflow:${AIRFLOW_HOME}/data_detectivedata_detectivedata_detectivedata_detective_airflow
96+
- ./data_detective_airflow:${AIRFLOW_HOME}/data_detective_airflow
9597
- ./tests:${AIRFLOW_HOME}/tests
9698
- ./tests_data:${AIRFLOW_HOME}/tests_data
9799
ports:
98-
- 127.0.0.1:9922:22
100+
- "127.0.0.1:9922:22"
99101
command: scheduler-ssh
100102
restart: always
101103

@@ -107,7 +109,7 @@ services:
107109
- ./dags:${AIRFLOW_HOME}/dags
108110
- ./data_detective_airflow:${AIRFLOW_HOME}/data_detective_airflow
109111
ports:
110-
- 8080:8080
112+
- "8080:8080"
111113
command: webserver
112114
healthcheck:
113115
test: curl -s -I -o /dev/null -w '%{http_code}' webserver:8080 | grep -qE '302|200'
@@ -121,7 +123,7 @@ services:
121123
depends_on:
122124
- scheduler
123125
ports:
124-
- 5555:5555
126+
- "5555:5555"
125127
command: flower
126128
healthcheck:
127129
test: ["CMD", "curl", "--fail", "http://flower:5555/"]

data-detective-airflow/docker-compose.tests.yml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ x-airflow: &airflow
55
context: .
66
target: dev
77
environment:
8-
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${META_USER}:${META_PASS}@metadb:5432/${META_USER}
8+
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${META_USER}:${META_PASS}@metadb:5432/${META_USER}
99
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
1010
- AIRFLOW__WEBSERVER__SECRET_KEY=${SECRET_KEY}
11-
- AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
11+
- AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False
1212
- AIRFLOW__CORE__LOAD_EXAMPLES=False
1313
- PYTHON_VERSION=${PYTHON_VERSION}
1414
- POETRY_VERSION=${POETRY_VERSION}
@@ -53,11 +53,13 @@ services:
5353
start_period: 20s
5454

5555
s3:
56-
image: localstack/localstack:0.11.5
56+
image: localstack/localstack:1.0.4
5757
environment:
58+
- EAGER_SERVICE_LOADING=1
5859
- SERVICES=s3
5960
- DEBUG=0
60-
- DATA_DIR=/tmp/localstack/data
61+
# Disable LocalStack Event Analytics
62+
- DISABLE_EVENTS=1
6163
volumes:
6264
- ./docker/init-s3.sh:/docker-entrypoint-initaws.d/init-s3.sh
6365
healthcheck:

data-detective-airflow/docker-compose.yml

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ x-airflow: &airflow
55
context: .
66
target: dev
77
environment:
8-
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${META_USER}:${META_PASS}@metadb:5432/${META_USER}
8+
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://${META_USER}:${META_PASS}@metadb:5432/${META_USER}
99
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
1010
- AIRFLOW__WEBSERVER__SECRET_KEY=${SECRET_KEY}
11-
- AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
11+
- AIRFLOW__DATABASE__LOAD_DEFAULT_CONNECTIONS=False
1212
- AIRFLOW__CORE__LOAD_EXAMPLES=False
1313

1414
services:
@@ -37,17 +37,19 @@ services:
3737
ports:
3838
- "5005:5432"
3939
volumes:
40-
- ./docker/init-pgwork.sql:/docker-entrypoint-initdb.d/init-pgwork.sql
40+
- ./docker/init-pg.sql:/docker-entrypoint-initdb.d/init-pg.sql
4141

4242
s3:
43-
image: localstack/localstack:0.11.5
43+
image: localstack/localstack:1.0.4
4444
ports:
4545
- "4566:4566"
4646
- "8055:8080"
4747
environment:
48+
- EAGER_SERVICE_LOADING=1
4849
- SERVICES=s3
4950
- DEBUG=0
50-
- DATA_DIR=/tmp/localstack/data
51+
# Disable LocalStack Event Analytics
52+
- DISABLE_EVENTS=1
5153
volumes:
5254
- ./docker/init-s3.sh:/docker-entrypoint-initaws.d/init-s3.sh
5355

data-detective-airflow/docker/init-connections.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
# Here it is necessary to set all connections to test/dev servers
44
airflow connections add --conn-uri 'postgres://airflow:airflow@pg:5432' pg
55
airflow connections add --conn-uri 'ftp://airflow:airflow@ssh_service:22' ssh_service
6-
airflow connections add --conn-type s3 --conn-extra "{\"aws_access_key_id\": \"accessKey1\", \"aws_secret_access_key\":\"verySecretKey1\", \"host\":\"http://s3:4566\"}" s3
6+
airflow connections add --conn-type aws --conn-extra "{\"aws_access_key_id\": \"accessKey1\", \"aws_secret_access_key\":\"verySecretKey1\", \"endpoint_url\":\"http://s3:4566\"}" s3

0 commit comments

Comments
 (0)