-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enregistrement des dataframes entre les tâches en table dans une DB spécifique (vs xcom) #1139
base: main
Are you sure you want to change the base?
Conversation
d2f9675
to
bf0f14a
Compare
…cifique (vs xcom)
72a9c75
to
d97f412
Compare
Contexte: problème soulevé lors de mon audit d'arrivé via Etat de fonctionnement des DAGs au 2024-11-04 Et en comparant la taille DB avant/après avec la requête suivante: SELECT
DATE(timestamp) AS date,
COUNT(*) AS nombre_entrees,
ROUND(SUM(pg_column_size(value) / 1024.0 / 1024.0)) AS taille_mb
FROM xcom
GROUP BY 1
ORDER BY 1 DESC On voit la taille de la table xcom qui explose (178MB de plus le 2024-11-04 avec seulement 18 DAGs lancées): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
La direction me semble bonne:
- On créer un Engine/Variable DB séparés: après libre au gestionnaire d'infra de décider si cela doit être sur une DB logiquement/physiquement séparée, le code donne la flexibilité
- On créer des fonctions d'écriture/lecture pour sauvegarder/récupérer les données entres les tâches
Et donc pour la suite je m'attends à ce que:
- Les appels XCOM (push, pull) ne contiennent que des pointeurs vers les données, pointeurs fournis (
xcom_push
) par les tâches en amonts après l'insertion des données, et utilisés par les tâche en aval (xcom_pull
) pour aller récupérer les données
J'ai laissé des commentaires sur le nommage car je le trouve en conflit avec la logique, ou alors j'ai pas compris la logique 😄
self.django_conn_id = django_conn_id | ||
self.data_conn_id = data_conn_id | ||
self.django_engine = self._create_engine(self.django_conn_id) | ||
self.data_engine = self._create_engine(self.data_conn_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pourquoi pas un nommage plus hiérarchique avec:
conn_id_django
conn_id_data
engine_django
engine_data
table_name = _table_name(dag_id, dag_run_id, task_id, dataset_name) | ||
df.to_sql(table_name, self.data_engine, if_exists="replace", index=False) | ||
|
||
def read_data_xcom( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Je suis confu par le nommage des fonctions: j'ai l'impression que le but est justement de NE PLUS échanger de données par le XCOM (simplement les pointeurs vers ces données) et mais le nom des fonctions indique clairement l'inverse: write_data_xcom
et read_data_xcom
.
def _table_name(dag_id: str, dag_run_id: str, task_id: str, dataset_name: str): | ||
# dag_run_id remove str before __ | ||
dag_run_id = dag_run_id.split("__")[1] | ||
timestamp = datetime.strptime(dag_run_id, "%Y-%m-%dT%H:%M:%S.%f%z") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timestamp = datetime.strptime(dag_run_id, "%Y-%m-%dT%H:%M:%S.%f%z") | |
timestamp = datetime.fromisoformat(dag_run_id) |
ça fonctionne ça ?
Description succincte du problème résolu
Arrêter de passer par la table XCOM pour transférer les dataframe entre les tâches:
A discuter :
Tache à suivre :
Type de changement :
Auto-review
Les trucs à faire avant de demander une review :
.env.template
Comment tester
En local / staging :