Skip to content

Commit

Permalink
chore: Add endpoint to execute Shuffle workflow
Browse files Browse the repository at this point in the history
This commit adds a new endpoint `/invoke-workflow` to the Shuffle integrations routes. The endpoint allows users with the "admin" or "analyst" scope to execute a workflow by providing the workflow ID and execution arguments. The execution ID is returned in the response.
  • Loading branch information
taylorwalton committed Aug 5, 2024
1 parent 16a046d commit 9ea7a26
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
25 changes: 23 additions & 2 deletions backend/app/connectors/shuffle/routes/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from loguru import logger

from app.auth.utils import AuthHandler
from app.connectors.shuffle.schema.integrations import IntegrationRequest
from app.connectors.shuffle.services.integrations import execute_integration
from app.connectors.shuffle.schema.integrations import IntegrationRequest, ExecuteWorkflowRequest
from app.connectors.shuffle.services.integrations import execute_integration, execute_workflow

shuffle_integrations_router = APIRouter()

Expand All @@ -26,3 +26,24 @@ async def execute_integration_route(request: IntegrationRequest):
"""
logger.info("Executing workflow")
return await execute_integration(request)


@shuffle_integrations_router.post(
"/invoke-workflow",
description="Invoke a Shuffle Workflow.",
dependencies=[Security(AuthHandler().require_any_scope("admin", "analyst"))],
)
async def invoke_workflow_route(request: ExecuteWorkflowRequest):
"""
Execute a workflow.
Args:
request (IntegrationRequest): The request object containing the workflow ID.
Returns:
dict: The response containing the execution ID.
"""
logger.info("Executing workflow")
return await execute_workflow(request)


20 changes: 18 additions & 2 deletions backend/app/connectors/shuffle/schema/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from typing import Dict
from typing import List
from typing import Optional

from fastapi import HTTPException
from pydantic import BaseModel
from pydantic import Field
from pydantic import Field, root_validator


class IntegrationRequest(BaseModel):
Expand All @@ -25,3 +25,19 @@ class IntegrationRequest(BaseModel):
description="Skip the workflow",
example=True,
)

class ExecuteWorkflowRequest(BaseModel):
workflow_id: str = Field(..., description="The ID of the workflow", example="workflow_id")
execution_arguments: Optional[Dict[str, Any]] = Field(
{},
description="The execution arguments",
example={"key": "value"},
)
start: str = Field("", description="The start of the workflow", example="start")

@root_validator
def check_customer_code(cls, values):
execution_arguments = values.get('execution_arguments', {})
if 'customer_code' not in execution_arguments or not execution_arguments['customer_code']:
raise HTTPException(status_code=400, detail="customer_code is required")
return values
17 changes: 16 additions & 1 deletion backend/app/connectors/shuffle/services/integrations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from loguru import logger

from app.connectors.shuffle.schema.integrations import IntegrationRequest
from app.connectors.shuffle.schema.integrations import IntegrationRequest, ExecuteWorkflowRequest
from app.connectors.shuffle.utils.universal import send_post_request


Expand All @@ -18,3 +18,18 @@ async def execute_integration(request: IntegrationRequest) -> dict:
response = await send_post_request("/api/v1/apps/categories/run", request.dict())
logger.info(f"Response: {response}")
return response

async def execute_workflow(request: ExecuteWorkflowRequest) -> dict:
"""
Execute a workflow.
Args:
request (IntegrationRequest): The request object containing the workflow ID.
Returns:
dict: The response containing the execution ID.
"""
logger.info(f"Executing workflow: {request}")
response = await send_post_request(f"/api/v1/workflows/{request.workflow_id}/execute", request.dict())
logger.info(f"Response: {response}")
return response

0 comments on commit 9ea7a26

Please sign in to comment.