Skip to content

Commit

Permalink
Added workflow DSL
Browse files Browse the repository at this point in the history
  • Loading branch information
seanchatmangpt committed Mar 24, 2024
1 parent ac491d4 commit d2eb614
Show file tree
Hide file tree
Showing 33 changed files with 1,499 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry] # https://python-poetry.org/docs/pyproject/
name = "dspygen"
version = "2024.3.22"
version = "2024.3.23"
description = "A Ruby on Rails style framework for the DSPy (Demonstrate, Search, Predict) project for Language Models like GPT, BERT, and LLama."
authors = ["Sean Chatman <info@chatmangpt.com>"]
readme = "README.md"
Expand Down
8 changes: 2 additions & 6 deletions src/dspygen/api.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
"""dspygen REST API."""
import importlib
import logging
import os
from fastapi.middleware.cors import CORSMiddleware # Import CORS middleware


import coloredlogs
Expand All @@ -19,11 +16,11 @@
import os

from dspygen.dsl.dsl_pipeline_executor import router as pipeline_router


from dspygen.experiments.control_flow.workflow_executor import router as workflow_router


app.include_router(pipeline_router)
app.include_router(workflow_router)


def load_module_routers(app: FastAPI):
Expand Down Expand Up @@ -66,4 +63,3 @@ def ping_pong():
allow_methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], # Adjust as per your requirements
allow_headers=["*"], # Adjust this to your specific headers if needed
)

Empty file.
45 changes: 45 additions & 0 deletions src/dspygen/experiments/control_flow/control_flow_workflow.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: SalesDataAnalysisWorkflow
triggers: manual
jobs:
- name: SalesDataProcessing
runner: python
steps:
- name: InitializeSalesData
code: |
sales_data = [
{'id': 1, 'amount': 150, 'product': 'Laptop', 'region': 'North'},
{'id': 2, 'amount': 90, 'product': 'Tablet', 'region': 'West'},
{'id': 3, 'amount': 200, 'product': 'Laptop', 'region': 'East'},
{'id': 4, 'amount': 30, 'product': 'Mouse', 'region': 'South'},
{'id': 5, 'amount': 120, 'product': 'Keyboard', 'region': 'North'}
]
print('Sales data initialized.')
- name: FilterHighValueSales
code: |
global high_value_sales
high_value_sales = [sale for sale in sales_data if sale['amount'] > 100]
print(f'High value sales filtered: {high_value_sales}')
if_:
expr: "len(sales_data) > 0"

- name: PrintHighValueSalesDetails
code: |
for sale in high_value_sales:
print(f"Sale ID: {sale['id']}, Amount: ${sale['amount']}, Product: {sale['product']}, Region: {sale['region']}")
loop:
over: "high_value_sales"
var: "sale"

- name: CalculateSummaryStatistics
code: |
total_sales_amount = sum(sale['amount'] for sale in high_value_sales)
average_sales_amount = total_sales_amount / len(high_value_sales)
print(f'Total sales amount from high value sales: ${total_sales_amount}')
print(f'Average sales amount from high value sales: ${average_sales_amount}')
if_:
expr: "len(high_value_sales) > 0"

- name: Summary
code: |
print('Workflow execution completed. High value sales analysis done.')
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
jobs:
- depends: null
env: null
name: SalesDataProcessing
runner: python
steps:
- args: null
code: "sales_data = [\n {'id': 1, 'amount': 150, 'product': 'Laptop', 'region': 'North'},\n {'id': 2, 'amount': 90, 'product': 'Tablet', 'region': 'West'},\n {'id': 3, 'amount': 200, 'product': 'Laptop', 'region': 'East'},\n {'id': 4, 'amount': 30, 'product': 'Mouse', 'region': 'South'},\n {'id': 5, 'amount': 120, 'product': 'Keyboard', 'region': 'North'}\n]\nprint('Sales data initialized.')\n"
cond: null
env: null
loop: null
name: InitializeSalesData
use: null
- args: null
code: 'global high_value_sales
high_value_sales = [sale for sale in sales_data if sale[''amount''] > 100]
print(f''High value sales filtered: {high_value_sales}'')
'
cond: null
env: null
loop: null
name: FilterHighValueSales
use: null
- args: null
code: "for sale in high_value_sales:\n print(f\"Sale ID: {sale['id']}, Amount: ${sale['amount']}, Product: {sale['product']}, Region: {sale['region']}\")\n"
cond: null
env: null
loop:
over: high_value_sales
var: sale
name: PrintHighValueSalesDetails
use: null
- args: null
code: 'total_sales_amount = sum(sale[''amount''] for sale in high_value_sales)
average_sales_amount = total_sales_amount / len(high_value_sales)
print(f''Total sales amount from high value sales: ${total_sales_amount}'')
print(f''Average sales amount from high value sales: ${average_sales_amount}'')
'
cond: null
env: null
loop: null
name: CalculateSummaryStatistics
use: null
- args: null
code: 'print(''Workflow execution completed. High value sales analysis done.'')
'
cond: null
env: null
loop: null
name: Summary
use: null
name: SalesDataAnalysisWorkflow
triggers: manual
132 changes: 132 additions & 0 deletions src/dspygen/experiments/control_flow/dsl_control_flow_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import json
from typing import List, Union, Dict, Any, Optional
from pydantic import BaseModel, validator, Field

from dspygen.utils.yaml_tools import YAMLMixin


class Condition(BaseModel):
expr: str = Field(..., description="Expression to evaluate the condition.")


class Loop(BaseModel):
over: str = Field(..., description="Iterable expression.")
var: str = Field(..., description="Variable name for current item.")


class Action(BaseModel):
name: str
use: Optional[str] = None
args: Optional[Dict[str, Any]] = None
code: Optional[str] = None
env: Optional[Dict[str, str]] = None
cond: Optional[Condition] = Field(None)
loop: Optional[Loop] = None


class Job(BaseModel):
name: str
depends: Optional[List[str]] = None
runner: str
steps: List[Action]
env: Optional[Dict[str, str]] = None


class Workflow(BaseModel, YAMLMixin):
name: str
triggers: Union[str, List[str]]
jobs: List[Job]


def evaluate_condition(condition: str, context: Dict[str, Any]) -> bool:
"""
Evaluates a condition expression against the given context.
"""
try:
return eval(condition, {}, context)
except Exception as e:
print(f"Error evaluating condition '{condition}': {e}")
return False


def execute_action(action: Action, context: Dict[str, Any]) -> Dict[str, Any]:
"""
Executes a single action based on its type (use or code) and updates the context.
"""
new_context = context.copy()
if action.cond and not evaluate_condition(action.cond.expr, context):
print(f"Skipping action {action.name} due to condition")
return new_context

if action.use:
print(f"Executing module {action.use} with args {action.args}")
elif action.code:
# Prepare an isolated yet shared execution environment
local_context = {}
global_context = context
exec(action.code, global_context, local_context)

# Merge local changes back into the global context
context.update(local_context)

return context


def execute_loop(loop: Loop, actions: List[Action], context: Dict[str, Any]) -> Dict[str, Any]:
"""
Iterates over a loop, executing contained actions for each item.
"""
items = eval(loop.over, {}, context)
for item in items:
loop_context = context.copy()
loop_context[loop.var] = item
for action in actions:
# Ensure loop_context is updated with each action's changes
loop_context = execute_action(action, loop_context)
return loop_context


def execute_job(job: Job, global_context: Dict[str, Any]) -> Dict[str, Any]:
"""
Executes all actions in a job, respecting conditions and loops.
"""
job_context = {**global_context, **(job.env or {})}
for action in job.steps:
if action.loop:
job_context = execute_loop(action.loop, [action], job_context)
else:
job_context = execute_action(action, job_context)
return job_context


def execute_workflow(workflow: Workflow, init_ctx: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Orchestrates the execution of all jobs in a workflow.
"""
global_context = init_ctx
for job in workflow.jobs:
# In a real scenario, respect job.depends for execution order
global_context = execute_job(job, global_context)
print("Workflow execution completed.")
return global_context


def serialize_context(context):
serialized_context = {}
for key, value in context.items():
try:
json.dumps(value) # Test if value is serializable
serialized_context[key] = value
except (TypeError, ValueError):
serialized_context[key] = str(value) # Convert non-serializable types to string
return serialized_context


def main():
wf = Workflow.from_yaml("control_flow_workflow.yaml")
execute_workflow(wf)
wf.to_yaml("control_flow_workflow_output_new.yaml")


if __name__ == '__main__':
main()
46 changes: 46 additions & 0 deletions src/dspygen/experiments/control_flow/workflow_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import json
import os
import tempfile
from typing import Optional

from fastapi import APIRouter, HTTPException
from pydantic import BaseModel

from dspygen.experiments.control_flow.dsl_control_flow_models import execute_workflow, Workflow, serialize_context

router = APIRouter()


class WorkflowRequest(BaseModel):
yaml_content: str
init_ctx: Optional[dict] = None


@router.post("/execute_workflow/")
async def run_workflow(request: WorkflowRequest):
try:
# Create a temporary file to hold the YAML content
with tempfile.NamedTemporaryFile(delete=False, mode='w+', suffix='.yaml') as tmp:
tmp.write(request.yaml_content)
tmp_path = tmp.name

wf = Workflow.from_yaml(tmp_path)

context = execute_workflow(wf, request.init_ctx)

# Optionally, clean up the temporary file after execution
os.remove(tmp_path)

# Convert the context to a dictionary making sure it is JSON serializable
# context = {k: v for k, v in context.items() if isinstance(v, (str, int, float, list, dict, bool, type(None)))}

serializable_context = serialize_context(context)

del serializable_context["__builtins__"]

return serializable_context
except Exception as e:
# Ensure the temporary file is removed even if an error occurs
if 'tmp_path' in locals():
os.remove(tmp_path)
raise HTTPException(status_code=500, detail=str(e))
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os

import inflection
from slugify import slugify

from dspygen.dsl.dsl_pipeline_executor import execute_pipeline


def feature_code_generation():

context = execute_pipeline(f'{os.getcwd()}/data_gherkin_pipeline.yaml',
init_ctx={"file_path": f"{os.getcwd()}/features.csv"})

file_name = "hello-world" # slugify(f"{inflection.underscore(result['FeatureDescription'])}")

with open(f"{file_name}.tsx", 'w') as f:
code = context.react_code
# remove trailing ``` if present
if code.endswith("```"):
code = code[:-3]
f.write(context.react_code)
print(f"React JSX code written to {file_name}")


def main():
feature_code_generation()


if __name__ == '__main__':
main()
Loading

0 comments on commit d2eb614

Please sign in to comment.