Skip to content

Issue/36 dag configuration translator #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 33 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
2cf77c1
wip: add DAG related properties to block and project models
annabeckers Nov 11, 2024
2c2336b
style: linting and comments
annabeckers Nov 12, 2024
dc1222a
wip: project to DAG translator
annabeckers Nov 12, 2024
a66db88
wip: DAG translator
annabeckers Nov 21, 2024
5f84188
wip: Create new API endpoint to add block. Add controller to add block.
LukasBeckers Nov 21, 2024
31b5fec
wip: Add templates for DAG_translator. Implement DAG_tranlator
LukasBeckers Nov 21, 2024
ced589f
wip: DAG translator
annabeckers Nov 22, 2024
3606a65
Merge new changes
annabeckers Nov 23, 2024
55eee79
wip: DAG translator
annabeckers Nov 23, 2024
ac05d66
wip: DAG translator
annabeckers Nov 23, 2024
dbff735
wip: DAG route test
annabeckers Nov 26, 2024
a8ce123
wip: add database models for frontend development
annabeckers Nov 27, 2024
7e646c6
Merge branch 'main' of github.com:RWTH-TIME/scystream into issue/36-D…
annabeckers Nov 27, 2024
7b8afec
wip: fix relationships between new models, fix template for DAG
annabeckers Nov 27, 2024
0c93b94
wip: change DAG translator to represent changes to models
annabeckers Dec 1, 2024
0647594
wip: fix relationships between models
annabeckers Dec 10, 2024
e739e37
fix: fixed relationship between Entrypoint and InputOutput
annabeckers Dec 11, 2024
9fabe8d
wip: some additions to models, fix in translator
annabeckers Jan 7, 2025
5e6995d
wip: some fixes
annabeckers Jan 8, 2025
026923f
wip: pytest files, other fixes
annabeckers Jan 8, 2025
db8d5e0
wip: tested DAG translator
annabeckers Jan 13, 2025
e582a12
fix: migration error
annabeckers Jan 15, 2025
726ee74
fix: remove comments
annabeckers Jan 21, 2025
31853d6
fix: remove comments
annabeckers Jan 21, 2025
fb7ec87
fix: change Exception detail
annabeckers Jan 21, 2025
478e108
fix: decomment the selected_entrypoint in add_new_block
annabeckers Jan 21, 2025
86ac6df
fix: requested changes
annabeckers Jan 21, 2025
4b6630a
Merge branch 'issue/36-DAG-configuration-translator' of github.com:RW…
annabeckers Jan 21, 2025
18beb16
fix: migrations but disables DAG translator for now
annabeckers Jan 22, 2025
ccaae16
wip: changes to relationships between block and entrypoint
annabeckers Jan 22, 2025
c393cb8
fix: migrations error
annabeckers Jan 22, 2025
408ded9
fix: removed comment from code
annabeckers Jan 22, 2025
31b6481
style: resolve comments
PaulKalho Feb 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

Make sure that the core-postgres container is running.

For local development create an `.env` file inside the `/src` directory.
As an example see `src/.env.example`
For local development create an `.env` file inside the `/core` directory.
As an example see `core/.env.example`

Make sure you are in the `../core` directory

Expand All @@ -23,7 +23,7 @@ We are using [alembic](https://alembic.sqlalchemy.org/en/latest/) as our migrati
To create a migration, run

```sh
alembic revision --autogenerate -m {accurate description of what happens}
alembic revision --autogenerate -m "accurate description of what happens"
```

After creating the revision, please check `src/alembic/versions/{accurate-description}.py`
Expand Down
3 changes: 3 additions & 0 deletions core/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
from services.workflow_service.models.project import Project # noqa: F401
from services.workflow_service.models.user_project import UserProject # noqa: F401, E501
from services.workflow_service.models.block import Block # noqa: F401
from services.workflow_service.models.block import block_dependencies # noqa: F401, E501
from services.workflow_service.models.entrypoint import Entrypoint # noqa: F401, E501
from services.workflow_service.models.inputoutput import InputOutput # noqa: F401, E501

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
Expand Down
80 changes: 80 additions & 0 deletions core/alembic/versions/0320e7c7803a_add_workflow_dependencies.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""add_workflow_dependencies

Revision ID: 0320e7c7803a
Revises: 09c39621eadc
Create Date: 2025-01-22 12:01:43.151923

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = '0320e7c7803a'
down_revision: Union[str, None] = '09c39621eadc'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('block_dependencies',
sa.Column('upstream_block_uuid', sa.UUID(), nullable=False),
sa.Column('downstream_block_uuid', sa.UUID(), nullable=False),
sa.ForeignKeyConstraint(['downstream_block_uuid'], ['blocks.uuid'], name='fk_downstream_block', ondelete='CASCADE'),
sa.ForeignKeyConstraint(['upstream_block_uuid'], ['blocks.uuid'], name='fk_upstream_block', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('upstream_block_uuid', 'downstream_block_uuid')
)
op.create_table('entrypoints',
sa.Column('uuid', sa.UUID(), nullable=False),
sa.Column('name', sa.String(length=100), nullable=False),
sa.Column('description', sa.String(length=500), nullable=True),
sa.Column('envs', sa.JSON(), nullable=False),
sa.Column('block_uuid', sa.UUID(), nullable=True),
sa.ForeignKeyConstraint(['block_uuid'], ['blocks.uuid'], name='fk_block_uuid', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('uuid')
)
op.create_table('inputoutputs',
sa.Column('uuid', sa.UUID(), nullable=False),
sa.Column('type', sa.Enum('INPUT', 'OUTPUT', name='inputoutputtype'), nullable=False),
sa.Column('name', sa.String(length=100), nullable=True),
sa.Column('data_type', sa.Enum('DBINPUT', 'FILE', name='datatype'), nullable=False),
sa.Column('description', sa.String(length=500), nullable=True),
sa.Column('config', sa.JSON(), nullable=False),
sa.Column('entrypoint_uuid', sa.UUID(), nullable=True),
sa.ForeignKeyConstraint(['entrypoint_uuid'], ['entrypoints.uuid'], name='fk_entrypoint_uuid', ondelete='CASCADE'),
sa.PrimaryKeyConstraint('uuid')
)
op.add_column('blocks', sa.Column('priority_weight', sa.Integer(), nullable=True))
op.add_column('blocks', sa.Column('retries', sa.Integer(), nullable=True))
op.add_column('blocks', sa.Column('retry_delay', sa.Integer(), nullable=True))
op.add_column('blocks', sa.Column('custom_name', sa.String(length=100), nullable=False))
op.add_column('blocks', sa.Column('description', sa.String(length=100), nullable=True))
op.add_column('blocks', sa.Column('author', sa.String(length=100), nullable=True))
op.add_column('blocks', sa.Column('docker_image', sa.String(length=150), nullable=False))
op.add_column('blocks', sa.Column('repo_url', sa.String(length=100), nullable=False))
op.add_column('blocks', sa.Column('x_pos', sa.Float(), nullable=True))
op.add_column('blocks', sa.Column('y_pos', sa.Float(), nullable=True))
op.add_column('projects', sa.Column('default_retries', sa.Integer(), nullable=True))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('projects', 'default_retries')
op.drop_column('blocks', 'y_pos')
op.drop_column('blocks', 'x_pos')
op.drop_column('blocks', 'repo_url')
op.drop_column('blocks', 'docker_image')
op.drop_column('blocks', 'author')
op.drop_column('blocks', 'description')
op.drop_column('blocks', 'custom_name')
op.drop_column('blocks', 'retry_delay')
op.drop_column('blocks', 'retries')
op.drop_column('blocks', 'priority_weight')
op.drop_table('inputoutputs')
op.drop_table('entrypoints')
op.drop_table('block_dependencies')
# ### end Alembic commands ###
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
"""add_relationship_between_block_entrypoint

Revision ID: 18f3f9de3ea4
Revises: 0320e7c7803a
Create Date: 2025-01-22 19:12:26.236527

"""
from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = '18f3f9de3ea4'
down_revision: Union[str, None] = '0320e7c7803a'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('blocks', sa.Column('selected_entrypoint_uuid', sa.UUID(), nullable=True))
op.create_foreign_key('fk_selected_entrypoint_uuid', 'blocks', 'entrypoints', ['selected_entrypoint_uuid'], ['uuid'], ondelete='SET NULL')
op.alter_column('entrypoints', 'block_uuid',
existing_type=sa.UUID(),
nullable=False)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('entrypoints', 'block_uuid',
existing_type=sa.UUID(),
nullable=True)
op.drop_constraint('fk_selected_entrypoint_uuid', 'blocks', type_='foreignkey')
op.drop_column('blocks', 'selected_entrypoint_uuid')
# ### end Alembic commands ###
4 changes: 4 additions & 0 deletions core/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from utils.database.connection import Base


Base.metadata.clear()
4 changes: 4 additions & 0 deletions core/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from services.user_service.views import user as user_view
from services.workflow_service.views import dag as dag_view, \
project as project_view
from utils.config.environment import ENV
from utils.database.connection import engine
from fastapi import FastAPI
Expand Down Expand Up @@ -37,3 +39,5 @@ async def test_db_conn():


app.include_router(user_view.router)
app.include_router(dag_view.router)
app.include_router(project_view.router)
3 changes: 2 additions & 1 deletion core/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ alembic==1.14.0
bcrypt==4.2.0
pyjwt==2.9.0
requests==2.32.3

apache-airflow-client==2.3.0
networkx==2.8.8
109 changes: 109 additions & 0 deletions core/services/workflow_service/controllers/DAG_translator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import os
from jinja2 import Environment, FileSystemLoader
from fastapi import HTTPException
import networkx as nx
from uuid import UUID

from services.workflow_service.controllers.project_controller \
import read_project


def translate_project_to_dag(project_uuid: UUID):
"""
Parses a project and its blocks into a DAG and validates it.
"""
# Query the project
project = read_project(project_uuid)

if not project:
raise HTTPException(status_code=404, detail="Project is not known")

# Create a directed graph
graph = nx.DiGraph()

# Add nodes (tasks)
for block in project.blocks:

entrypoint = block.selected_entrypoint

if not block.selected_entrypoint:
raise HTTPException(404, detail="No entrypoint has been selected.")

envs = entrypoint.envs
configs = [io.config for io in entrypoint.inputoutputs]

graph.add_node(block.uuid, **{
"uuid": block.uuid,
"name": block.name,
"block_type": block.block_type.value,
"priority": block.priority_weight,
"retries": block.retries,
"retry_delay": block.retry_delay,
"environment": {**envs, **configs}, # correct concatenation?
})

# Add edges (dependencies)
for block in project.blocks:
for upstream in block.upstream_blocks:
upstream_task_id = f"task_{str(upstream.uuid).replace('-', '')}"
current_task_id = f"task_{str(block.uuid).replace('-', '')}"
graph.add_edge(upstream_task_id, current_task_id)

# Ensure the graph is a DAG
if not nx.is_directed_acyclic_graph(graph):
raise HTTPException(
status_code=400,
detail="The project is not acyclic."
)

# Initialize Jinja2 environment
base_dir = os.path.dirname(os.path.abspath(__file__))
templates_dir = os.path.join( # Path to the templates
base_dir, "..", "templates"
)

env = Environment(loader=FileSystemLoader(templates_dir))
dag_template = env.get_template("dag_base.py.j2")
algorithm_template = env.get_template("algorithm_docker.py.j2")
dependency_template = env.get_template("dependency.py.j2")

# Generate Python DAG file
parts = []
parts = [dag_template.render(
dag_id=f"dag_{project_uuid.replace('-', '_')}"
)]

# Convert to Airflow-compatible representation
for node, data in graph.nodes(data=True):
parts.append(
algorithm_template.render(
task_id=node,
image="scystreamworker",
name=data["name"],
uuid=data["uuid"],
command=(
"sh -c 'python bv-c \"from scystream.sdk.scheduler import "
"Scheduler;"
"Scheduler.execute_function(\\\"function_name\\\")"
"\"'"
),
project=str(project_uuid),
algorithm=data["block_type"],
enviroment=data["enviroment"],
local_storage_path_external="/tmp/scystream-data",
)
)

dependencies = [dependency_template.render(
from_task=from_task, to_task=to_task)
for from_task, to_task in graph.edges]

parts.extend(dependencies)

# Write the generated DAG to a Python file
filename = os.path.join(
"~/airflow/dags/", f"dag_{project_uuid.replace('-', '_')}.py"
)

with open(filename, "w") as f:
f.write("\n".join(parts))
Loading
Loading