Skip to content

Commit

Permalink
feat(back):add unique_dataset_id to imprt synthese
Browse files Browse the repository at this point in the history
- Revision alembic to add column unique_dataset_id and src_unique_dataset_id
- Add into bib_fields and into cor_entity_fields

Reviewed-by: andriacap
  • Loading branch information
andriacap committed Mar 5, 2024
1 parent ac3fb19 commit d2c2eec
Showing 1 changed file with 126 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""add column unique_dataset_id to t_imports_synthese and insert into bib_fields and cor_entity_field
Revision ID: 6e1852ecfea2
Revises: bfc90691737d
Create Date: 2024-03-04 12:31:00.861460
"""

from alembic import op
import sqlalchemy as sa
from sqlalchemy.schema import Table, MetaData

# revision identifiers, used by Alembic.
revision = "6e1852ecfea2"
down_revision = "bfc90691737d"
branch_labels = None
depends_on = ("2b0b3bd0248c",)


def upgrade():
meta = MetaData(bind=op.get_bind())

# Add columns to t_imports_synthese table
with op.batch_alter_table("t_imports_synthese", schema="gn_imports") as batch_op:
batch_op.add_column(sa.Column("src_unique_dataset_id", sa.String))
batch_op.add_column(
sa.Column("unique_dataset_id", sa.String, nullable=True)
) # Adjust data type if needed

# Fetch id_destination for 'synthese' from bib_destinations table
destination = Table("bib_destinations", meta, autoload=True, schema="gn_imports")
id_dest_synthese = (
op.get_bind()
.execute(sa.select([destination.c.id_destination]).where(destination.c.code == "synthese"))
.scalar()
)

# Fetch id_field for 'unique_dataset_id' from bib_fields table
field = Table("bib_fields", meta, autoload=True, schema="gn_imports")
query = sa.select([c for c in field.c if c.name not in ["id_destination", "id_field"]]).where(
field.c.name_field == "unique_dataset_id"
)
result = op.get_bind().execute(query).fetchone()
insert_data = {"id_destination": id_dest_synthese, **result}
insert_data.pop("id_field", None)

# Insert the data into the 'bib_fields' table
insert_stmt = sa.insert(field).values(insert_data).returning(field.c.id_field)
id_field = op.get_bind().execute(insert_stmt).scalar()

# Fetch id_theme_general from bib_themes table
theme = Table("bib_themes", meta, autoload=True, schema="gn_imports")
id_theme_general = (
op.get_bind()
.execute(sa.select([theme.c.id_theme]).where(theme.c.name_theme == "general_info"))
.scalar()
)

# Fetch id_entity_observation for id_destination from bib_entities table
entity = Table("bib_entities", meta, autoload=True, schema="gn_imports")
id_entity_observation = (
op.get_bind()
.execute(sa.select([entity.c.id_entity]).where(entity.c.id_destination == id_dest_synthese))
.scalar()
)

fields_entities = {
"id_theme": id_theme_general,
"order_field": 3,
"comment": "Correspondance champs standard: metadonneeId ou jddMetaId",
}

# Insert data into cor_entity_field table
cor_entity_field = Table("cor_entity_field", meta, autoload=True, schema="gn_imports")
insert_data = {"id_entity": id_entity_observation, "id_field": id_field, **fields_entities}
op.execute(sa.insert(cor_entity_field).values(insert_data))


def downgrade():
meta = MetaData(bind=op.get_bind())

# Drop columns from t_imports_synthese table
with op.batch_alter_table("t_imports_synthese", schema="gn_imports") as batch_op:
batch_op.drop_column("unique_dataset_id")
batch_op.drop_column("src_unique_dataset_id")

# Fetch id_destination for 'synthese' from bib_destinations table
destination = Table("bib_destinations", meta, autoload=True, schema="gn_imports")
id_dest_synthese = (
op.get_bind()
.execute(sa.select([destination.c.id_destination]).where(destination.c.code == "synthese"))
.scalar()
)

# Fetch id_entity_observation for id_destination from bib_entities table
entity = Table("bib_entities", meta, autoload=True, schema="gn_imports")
id_entity_observation = (
op.get_bind()
.execute(sa.select([entity.c.id_entity]).where(entity.c.id_destination == id_dest_synthese))
.scalar()
)

# Fetch id_field inserted into bib_fields table
field = Table("bib_fields", meta, autoload=True, schema="gn_imports")
id_field = (
op.get_bind()
.execute(
sa.select([field.c.id_field]).where(
sa.and_(
field.c.name_field == "unique_dataset_id",
field.c.id_destination == id_dest_synthese,
)
)
)
.scalar()
)
# Delete the inserted data from bib_fields table
op.execute(field.delete().where(field.c.id_field == id_field))

# Delete the inserted data from cor_entity_field table
cor_entity_field = Table("cor_entity_field", meta, autoload=True, schema="gn_imports")
op.execute(
cor_entity_field.delete()
.where(cor_entity_field.c.id_entity == id_entity_observation)
.where(cor_entity_field.c.id_field == id_field)
)

0 comments on commit d2c2eec

Please sign in to comment.