diff --git a/.github/workflows/dapr-deploy.yml b/.github/workflows/dapr-deploy.yml index 44684157..d1437227 100644 --- a/.github/workflows/dapr-deploy.yml +++ b/.github/workflows/dapr-deploy.yml @@ -69,4 +69,7 @@ jobs: kubectl apply -f ./longhaul-test/pubsub-workflow-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/pubsub-workflow-app -n ${{ env.APP_NAMESPACE }} kubectl apply -f ./longhaul-test/snapshot-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/snapshot-app -n ${{ env.APP_NAMESPACE }} kubectl apply -f ./longhaul-test/validation-worker-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/validation-worker-app -n ${{ env.APP_NAMESPACE }} - kubectl apply -f ./longhaul-test/workflow-gen-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/workflow-gen-app -n ${{ env.APP_NAMESPACE }} + kubectl apply -f ./longhaul-test/scheduler-jobs-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/scheduler-jobs-app -n ${{ env.APP_NAMESPACE }} + kubectl apply -f ./longhaul-test/scheduler-actor-reminders-server-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/scheduler-actor-reminders-server-app -n ${{ env.APP_NAMESPACE }} + kubectl apply -f ./longhaul-test/scheduler-actor-reminders-client-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/scheduler-actor-reminders-client-app -n ${{ env.APP_NAMESPACE }} + kubectl apply -f ./longhaul-test/scheduler-workflow-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/scheduler-workflow-app -n ${{ env.APP_NAMESPACE }} \ No newline at end of file diff --git a/.github/workflows/dapr-longhaul-weekly.yml b/.github/workflows/dapr-longhaul-weekly.yml index eade4c84..9ee9091f 100644 --- a/.github/workflows/dapr-longhaul-weekly.yml +++ b/.github/workflows/dapr-longhaul-weekly.yml @@ -148,4 +148,7 @@ jobs: kubectl apply -f ./longhaul-test/snapshot-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/snapshot-app -n ${{ env.APP_NAMESPACE }} kubectl apply -f ./longhaul-test/validation-worker-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/validation-worker-app -n ${{ env.APP_NAMESPACE }} kubectl apply -f ./longhaul-test/workflow-gen-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/workflow-gen-app -n ${{ env.APP_NAMESPACE }} - + kubectl apply -f ./longhaul-test/scheduler-jobs-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/scheduler-jobs-app -n ${{ env.APP_NAMESPACE }} + kubectl apply -f ./longhaul-test/scheduler-actor-reminders-server-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/scheduler-actor-reminders-server-app -n ${{ env.APP_NAMESPACE }} + kubectl apply -f ./longhaul-test/scheduler-actor-reminders-client-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/scheduler-actor-reminders-client-app -n ${{ env.APP_NAMESPACE }} + kubectl apply -f ./longhaul-test/scheduler-workflow-deploy.yml -n ${{ env.APP_NAMESPACE }} && kubectl rollout restart deploy/scheduler-workflow-app -n ${{ env.APP_NAMESPACE }} \ No newline at end of file diff --git a/.github/workflows/scheduler-actor-reminders-client-build.yml b/.github/workflows/scheduler-actor-reminders-client-build.yml new file mode 100644 index 00000000..a6040719 --- /dev/null +++ b/.github/workflows/scheduler-actor-reminders-client-build.yml @@ -0,0 +1,84 @@ +# ------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------------------------------ + +name: build-scheduler-actor-reminders-client + +on: + push: + branches: + - master + paths: + - 'scheduler-actor-reminders/client/**' + - '.github/workflows/scheduler-actor-reminders-client-build.yml' + - '.github/workflows/dapr-deploy.yml' + pull_request: + branches: + - master + paths: + - 'scheduler-actor-reminders/client/**' + +jobs: + build: + name: build scheduler-actor-reminders-client + runs-on: ubuntu-latest + env: + APP_REGISTRY: daprtests.azurecr.io + APP_IMAGE_NAME: scheduler-actor-reminders-client + # TODO: APP_VER needs to be versioned correctly + APP_VER: dev + APP_DIR: ./scheduler-actor-reminders-client + ARTIFACT_DIR: ./deploy_artifact + steps: + - name: Check out code + uses: actions/checkout@v2 + - name: docker login + if: github.event_name != 'pull_request' + uses: docker/login-action@v1 + with: + registry: ${{ secrets.DOCKER_REGISTRY }} + username: ${{ secrets.DOCKER_REGISTRY_ID }} + password: ${{ secrets.DOCKER_REGISTRY_PASS }} + - name: Build scheduler-actor-reminders-client app docker image + run: | + docker compose build ${{ env.APP_IMAGE_NAME }} + docker tag ${{ env.APP_IMAGE_NAME }} ${{ env.APP_REGISTRY }}/${{ env.APP_IMAGE_NAME }}:${{ env.APP_VER }} + - name: Push scheduler-actor-reminders-client app image to dockerhub + if: github.event_name != 'pull_request' + run: | + docker push ${{ env.APP_REGISTRY }}/${{ env.APP_IMAGE_NAME }}:${{ env.APP_VER }} + - name: Copy deployment yaml to archieve + run: | + mkdir -p ${{ env.ARTIFACT_DIR }} + cp ./longhaul-test/*.yml ${{ env.ARTIFACT_DIR }} + - name: Upload artifacts + uses: actions/upload-artifact@master + with: + name: longhaul-test + path: ${{ env.ARTIFACT_DIR }} + deploy: + name: deploy scheduler-actor-reminders-client to test cluster + needs: build + if: github.event_name != 'pull_request' + runs-on: ubuntu-latest + env: + APP_NAMESPACE: longhaul-test + TEST_CLUSTER_NAME: aks-longhaul-release + TEST_RESOURCE_GROUP: aks-longhaul-release + ARTIFACT_DIR: ./deploy_artifact + steps: + - name: download artifacts + uses: actions/download-artifact@master + with: + name: longhaul-test + path: ${{ env.ARTIFACT_DIR }} + - name: Login Azure + run: | + az login --service-principal -u ${{ secrets.AZURE_LOGIN_USER }} -p ${{ secrets.AZURE_LOGIN_PASS }} --tenant ${{ secrets.AZURE_TENANT }} --output none + - name: Set up kubeconf for longhaul test environment + run: | + az aks get-credentials -n ${{ env.TEST_CLUSTER_NAME }} -g ${{ env.TEST_RESOURCE_GROUP }} + - name: Deploy apps to longhaul test environment + run: | + kubectl apply -n ${{ env.APP_NAMESPACE }} -f ${{ env.ARTIFACT_DIR }}/scheduler-actor-reminders-client-deploy.yml & kubectl rollout restart -n ${{ env.APP_NAMESPACE }} deploy/scheduler-actor-reminders-client-app diff --git a/.github/workflows/scheduler-actor-reminders-server-build.yml b/.github/workflows/scheduler-actor-reminders-server-build.yml new file mode 100644 index 00000000..85c50e67 --- /dev/null +++ b/.github/workflows/scheduler-actor-reminders-server-build.yml @@ -0,0 +1,84 @@ +# ------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------------------------------ + +name: build-scheduler-actor-reminders-server + +on: + push: + branches: + - master + paths: + - 'scheduler-actor-reminders/server/**' + - '.github/workflows/scheduler-actor-reminders-server-build.yml' + - '.github/workflows/dapr-deploy.yml' + pull_request: + branches: + - master + paths: + - 'scheduler-actor-reminders/server/**' + +jobs: + build: + name: build scheduler-actor-reminders-server + runs-on: ubuntu-latest + env: + APP_REGISTRY: daprtests.azurecr.io + APP_IMAGE_NAME: scheduler-actor-reminders-server + # TODO: APP_VER needs to be versioned correctly + APP_VER: dev + APP_DIR: ./scheduler-actor-reminders-server + ARTIFACT_DIR: ./deploy_artifact + steps: + - name: Check out code + uses: actions/checkout@v2 + - name: docker login + if: github.event_name != 'pull_request' + uses: docker/login-action@v1 + with: + registry: ${{ secrets.DOCKER_REGISTRY }} + username: ${{ secrets.DOCKER_REGISTRY_ID }} + password: ${{ secrets.DOCKER_REGISTRY_PASS }} + - name: Build scheduler-actor-reminders-server app docker image + run: | + docker compose build ${{ env.APP_IMAGE_NAME }} + docker tag ${{ env.APP_IMAGE_NAME }} ${{ env.APP_REGISTRY }}/${{ env.APP_IMAGE_NAME }}:${{ env.APP_VER }} + - name: Push scheduler-actor-reminders-server app image to dockerhub + if: github.event_name != 'pull_request' + run: | + docker push ${{ env.APP_REGISTRY }}/${{ env.APP_IMAGE_NAME }}:${{ env.APP_VER }} + - name: Copy deployment yaml to archieve + run: | + mkdir -p ${{ env.ARTIFACT_DIR }} + cp ./longhaul-test/*.yml ${{ env.ARTIFACT_DIR }} + - name: Upload artifacts + uses: actions/upload-artifact@master + with: + name: longhaul-test + path: ${{ env.ARTIFACT_DIR }} + deploy: + name: deploy scheduler-actor-reminders-server to test cluster + needs: build + if: github.event_name != 'pull_request' + runs-on: ubuntu-latest + env: + APP_NAMESPACE: longhaul-test + TEST_CLUSTER_NAME: aks-longhaul-release + TEST_RESOURCE_GROUP: aks-longhaul-release + ARTIFACT_DIR: ./deploy_artifact + steps: + - name: download artifacts + uses: actions/download-artifact@master + with: + name: longhaul-test + path: ${{ env.ARTIFACT_DIR }} + - name: Login Azure + run: | + az login --service-principal -u ${{ secrets.AZURE_LOGIN_USER }} -p ${{ secrets.AZURE_LOGIN_PASS }} --tenant ${{ secrets.AZURE_TENANT }} --output none + - name: Set up kubeconf for longhaul test environment + run: | + az aks get-credentials -n ${{ env.TEST_CLUSTER_NAME }} -g ${{ env.TEST_RESOURCE_GROUP }} + - name: Deploy apps to longhaul test environment + run: | + kubectl apply -n ${{ env.APP_NAMESPACE }} -f ${{ env.ARTIFACT_DIR }}/scheduler-actor-reminders-server-deploy.yml & kubectl rollout restart -n ${{ env.APP_NAMESPACE }} deploy/scheduler-actor-reminders-server-app diff --git a/.github/workflows/scheduler-jobs-build.yml b/.github/workflows/scheduler-jobs-build.yml new file mode 100644 index 00000000..dbaeca85 --- /dev/null +++ b/.github/workflows/scheduler-jobs-build.yml @@ -0,0 +1,84 @@ +# ------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------------------------------ + +name: build-scheduler-jobs + +on: + push: + branches: + - master + paths: + - 'scheduler-jobs/**' + - '.github/workflows/scheduler-jobs-build.yml' + - '.github/workflows/dapr-deploy.yml' + pull_request: + branches: + - master + paths: + - 'scheduler-jobs/**' + +jobs: + build: + name: build scheduler-jobs + runs-on: ubuntu-latest + env: + APP_REGISTRY: daprtests.azurecr.io + APP_IMAGE_NAME: scheduler-jobs + # TODO: APP_VER needs to be versioned correctly + APP_VER: dev + APP_DIR: ./scheduler-jobs + ARTIFACT_DIR: ./deploy_artifact + steps: + - name: Check out code + uses: actions/checkout@v2 + - name: docker login + if: github.event_name != 'pull_request' + uses: docker/login-action@v1 + with: + registry: ${{ secrets.DOCKER_REGISTRY }} + username: ${{ secrets.DOCKER_REGISTRY_ID }} + password: ${{ secrets.DOCKER_REGISTRY_PASS }} + - name: Build scheduler-jobs app docker image + run: | + docker compose build ${{ env.APP_IMAGE_NAME }} + docker tag ${{ env.APP_IMAGE_NAME }} ${{ env.APP_REGISTRY }}/${{ env.APP_IMAGE_NAME }}:${{ env.APP_VER }} + - name: Push scheduler-jobs app image to dockerhub + if: github.event_name != 'pull_request' + run: | + docker push ${{ env.APP_REGISTRY }}/${{ env.APP_IMAGE_NAME }}:${{ env.APP_VER }} + - name: Copy deployment yaml to archieve + run: | + mkdir -p ${{ env.ARTIFACT_DIR }} + cp ./longhaul-test/*.yml ${{ env.ARTIFACT_DIR }} + - name: Upload artifacts + uses: actions/upload-artifact@master + with: + name: longhaul-test + path: ${{ env.ARTIFACT_DIR }} + deploy: + name: deploy scheduler-jobs to test cluster + needs: build + if: github.event_name != 'pull_request' + runs-on: ubuntu-latest + env: + APP_NAMESPACE: longhaul-test + TEST_CLUSTER_NAME: aks-longhaul-release + TEST_RESOURCE_GROUP: aks-longhaul-release + ARTIFACT_DIR: ./deploy_artifact + steps: + - name: download artifacts + uses: actions/download-artifact@master + with: + name: longhaul-test + path: ${{ env.ARTIFACT_DIR }} + - name: Login Azure + run: | + az login --service-principal -u ${{ secrets.AZURE_LOGIN_USER }} -p ${{ secrets.AZURE_LOGIN_PASS }} --tenant ${{ secrets.AZURE_TENANT }} --output none + - name: Set up kubeconf for longhaul test environment + run: | + az aks get-credentials -n ${{ env.TEST_CLUSTER_NAME }} -g ${{ env.TEST_RESOURCE_GROUP }} + - name: Deploy apps to longhaul test environment + run: | + kubectl apply -n ${{ env.APP_NAMESPACE }} -f ${{ env.ARTIFACT_DIR }}/scheduler-jobs-deploy.yml & kubectl rollout restart -n ${{ env.APP_NAMESPACE }} deploy/scheduler-jobs-app diff --git a/.github/workflows/scheduler-workflow-build.yml b/.github/workflows/scheduler-workflow-build.yml new file mode 100644 index 00000000..7c513915 --- /dev/null +++ b/.github/workflows/scheduler-workflow-build.yml @@ -0,0 +1,84 @@ +# ------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------------------------------ + +name: build-scheduler-workflow + +on: + push: + branches: + - master + paths: + - 'scheduler-workflow/**' + - '.github/workflows/scheduler-workflow-build.yml' + - '.github/workflows/dapr-deploy.yml' + pull_request: + branches: + - master + paths: + - 'scheduler-workflow/**' + +jobs: + build: + name: build scheduler-workflow + runs-on: ubuntu-latest + env: + APP_REGISTRY: daprtests.azurecr.io + APP_IMAGE_NAME: scheduler-workflow + # TODO: APP_VER needs to be versioned correctly + APP_VER: dev + APP_DIR: ./scheduler-workflow + ARTIFACT_DIR: ./deploy_artifact + steps: + - name: Check out code + uses: actions/checkout@v2 + - name: docker login + if: github.event_name != 'pull_request' + uses: docker/login-action@v1 + with: + registry: ${{ secrets.DOCKER_REGISTRY }} + username: ${{ secrets.DOCKER_REGISTRY_ID }} + password: ${{ secrets.DOCKER_REGISTRY_PASS }} + - name: Build scheduler-workflow app docker image + run: | + docker compose build ${{ env.APP_IMAGE_NAME }} + docker tag ${{ env.APP_IMAGE_NAME }} ${{ env.APP_REGISTRY }}/${{ env.APP_IMAGE_NAME }}:${{ env.APP_VER }} + - name: Push scheduler-workflow app image to dockerhub + if: github.event_name != 'pull_request' + run: | + docker push ${{ env.APP_REGISTRY }}/${{ env.APP_IMAGE_NAME }}:${{ env.APP_VER }} + - name: Copy deployment yaml to archieve + run: | + mkdir -p ${{ env.ARTIFACT_DIR }} + cp ./longhaul-test/*.yml ${{ env.ARTIFACT_DIR }} + - name: Upload artifacts + uses: actions/upload-artifact@master + with: + name: longhaul-test + path: ${{ env.ARTIFACT_DIR }} + deploy: + name: deploy scheduler-workflow to test cluster + needs: build + if: github.event_name != 'pull_request' + runs-on: ubuntu-latest + env: + APP_NAMESPACE: longhaul-test + TEST_CLUSTER_NAME: aks-longhaul-release + TEST_RESOURCE_GROUP: aks-longhaul-release + ARTIFACT_DIR: ./deploy_artifact + steps: + - name: download artifacts + uses: actions/download-artifact@master + with: + name: longhaul-test + path: ${{ env.ARTIFACT_DIR }} + - name: Login Azure + run: | + az login --service-principal -u ${{ secrets.AZURE_LOGIN_USER }} -p ${{ secrets.AZURE_LOGIN_PASS }} --tenant ${{ secrets.AZURE_TENANT }} --output none + - name: Set up kubeconf for longhaul test environment + run: | + az aks get-credentials -n ${{ env.TEST_CLUSTER_NAME }} -g ${{ env.TEST_RESOURCE_GROUP }} + - name: Deploy apps to longhaul test environment + run: | + kubectl apply -n ${{ env.APP_NAMESPACE }} -f ${{ env.ARTIFACT_DIR }}/scheduler-workflow-deploy.yml & kubectl rollout restart -n ${{ env.APP_NAMESPACE }} deploy/scheduler-workflow-app diff --git a/deploy/aks/apps/scheduler-actor-reminders-client-deploy.bicep b/deploy/aks/apps/scheduler-actor-reminders-client-deploy.bicep new file mode 100644 index 00000000..c45110fc --- /dev/null +++ b/deploy/aks/apps/scheduler-actor-reminders-client-deploy.bicep @@ -0,0 +1,82 @@ +@secure() +param kubeConfig string +param kubernetesNamespace string + +import 'kubernetes@1.0.0' with { + namespace: 'default' + kubeConfig: kubeConfig +} + +resource coreService_schedulerActorRemindersClient 'core/Service@v1' = { + metadata: { + name: 'scheduler-actor-reminders-client' + labels: { + app: 'scheduler-actor-reminders-client' + } + namespace: kubernetesNamespace + } + spec: { + selector: { + app: 'scheduler-actor-reminders-client' + } + ports: [ + { + protocol: 'TCP' + port: 9988 + targetPort: 9988 + } + ] + type: 'ClusterIP' + } +} + +resource appsDeployment_schedulerActorRemindersClient 'apps/Deployment@v1' = { + metadata: { + name: 'scheduler-actor-reminders-client-app' + labels: { + app: 'scheduler-actor-reminders-client' + } + namespace: kubernetesNamespace + } + spec: { + replicas: 1 + selector: { + matchLabels: { + app: 'scheduler-actor-reminders-client' + } + } + template: { + metadata: { + labels: { + app: 'scheduler-actor-reminders-client' + } + annotations: { + 'dapr.io/enabled': 'true' + 'dapr.io/app-id': 'scheduler-actor-reminders-client' + 'dapr.io/enable-profiling': 'true' + 'dapr.io/log-as-json': 'true' + 'prometheus.io/scrape': 'true' + 'prometheus.io/port': '9988' + } + } + spec: { + containers: [ + { + name: 'scheduler-actor-reminders-client' + image: 'daprtests.azurecr.io/scheduler-actor-reminders-client:dev' + ports: [ + { + containerPort: 3008 + } + { + name: 'prom' + containerPort: 9988 + } + ] + imagePullPolicy: 'Always' + } + ] + } + } + } +} diff --git a/deploy/aks/apps/scheduler-actor-reminders-server-deploy.bicep b/deploy/aks/apps/scheduler-actor-reminders-server-deploy.bicep new file mode 100644 index 00000000..2db6a0a7 --- /dev/null +++ b/deploy/aks/apps/scheduler-actor-reminders-server-deploy.bicep @@ -0,0 +1,82 @@ +@secure() +param kubeConfig string +param kubernetesNamespace string + +import 'kubernetes@1.0.0' with { + namespace: 'default' + kubeConfig: kubeConfig +} + +resource coreService_schedulerActorRemindersServer 'core/Service@v1' = { + metadata: { + name: 'scheduler-actor-reminders-server' + labels: { + app: 'scheduler-actor-reminders-server' + } + namespace: kubernetesNamespace + } + spec: { + selector: { + app: 'scheduler-actor-reminders-server' + } + ports: [ + { + protocol: 'TCP' + port: 9988 + targetPort: 9988 + } + ] + type: 'ClusterIP' + } +} + +resource appsDeployment_schedulerActorRemindersServer 'apps/Deployment@v1' = { + metadata: { + name: 'scheduler-actor-reminders-server-app' + labels: { + app: 'scheduler-actor-reminders-server' + } + namespace: kubernetesNamespace + } + spec: { + replicas: 1 + selector: { + matchLabels: { + app: 'scheduler-actor-reminders-server' + } + } + template: { + metadata: { + labels: { + app: 'scheduler-actor-reminders-server' + } + annotations: { + 'dapr.io/enabled': 'true' + 'dapr.io/app-id': 'scheduler-actor-reminders-server' + 'dapr.io/enable-profiling': 'true' + 'dapr.io/log-as-json': 'true' + 'prometheus.io/scrape': 'true' + 'prometheus.io/port': '9988' + } + } + spec: { + containers: [ + { + name: 'scheduler-actor-reminders-server' + image: 'daprtests.azurecr.io/scheduler-actor-reminders-server:dev' + ports: [ + { + containerPort: 3007 + } + { + name: 'prom' + containerPort: 9988 + } + ] + imagePullPolicy: 'Always' + } + ] + } + } + } +} diff --git a/deploy/aks/apps/scheduler-jobs-deploy.bicep b/deploy/aks/apps/scheduler-jobs-deploy.bicep new file mode 100644 index 00000000..ea5e180b --- /dev/null +++ b/deploy/aks/apps/scheduler-jobs-deploy.bicep @@ -0,0 +1,82 @@ +@secure() +param kubeConfig string +param kubernetesNamespace string + +import 'kubernetes@1.0.0' with { + namespace: 'default' + kubeConfig: kubeConfig +} + +resource coreService_schedulerJobs 'core/Service@v1' = { + metadata: { + name: 'scheduler-jobs' + labels: { + app: 'scheduler-jobs' + } + namespace: kubernetesNamespace + } + spec: { + selector: { + app: 'scheduler-jobs' + } + ports: [ + { + protocol: 'TCP' + port: 9988 + targetPort: 9988 + } + ] + type: 'ClusterIP' + } +} + +resource appsDeployment_schedulerJobs 'apps/Deployment@v1' = { + metadata: { + name: 'scheduler-jobs-app' + labels: { + app: 'scheduler-jobs' + } + namespace: kubernetesNamespace + } + spec: { + replicas: 1 + selector: { + matchLabels: { + app: 'scheduler-jobs' + } + } + template: { + metadata: { + labels: { + app: 'scheduler-jobs' + } + annotations: { + 'dapr.io/enabled': 'true' + 'dapr.io/app-id': 'scheduler-jobs' + 'dapr.io/enable-profiling': 'true' + 'dapr.io/log-as-json': 'true' + 'prometheus.io/scrape': 'true' + 'prometheus.io/port': '9988' + } + } + spec: { + containers: [ + { + name: 'scheduler-jobs' + image: 'daprtests.azurecr.io/scheduler-jobs:dev' + ports: [ + { + containerPort: 3006 + } + { + name: 'prom' + containerPort: 9988 + } + ] + imagePullPolicy: 'Always' + } + ] + } + } + } +} diff --git a/deploy/aks/apps/scheduler-workflow-deploy.bicep b/deploy/aks/apps/scheduler-workflow-deploy.bicep new file mode 100644 index 00000000..25a08640 --- /dev/null +++ b/deploy/aks/apps/scheduler-workflow-deploy.bicep @@ -0,0 +1,82 @@ +@secure() +param kubeConfig string +param kubernetesNamespace string + +import 'kubernetes@1.0.0' with { + namespace: 'default' + kubeConfig: kubeConfig +} + +resource coreService_schedulerWorkflow 'core/Service@v1' = { + metadata: { + name: 'scheduler-workflow' + labels: { + app: 'scheduler-workflow' + } + namespace: kubernetesNamespace + } + spec: { + selector: { + app: 'scheduler-workflow' + } + ports: [ + { + protocol: 'TCP' + port: 9988 + targetPort: 9988 + } + ] + type: 'ClusterIP' + } +} + +resource appsDeployment_schedulerWorkflow 'apps/Deployment@v1' = { + metadata: { + name: 'scheduler-workflow-app' + labels: { + app: 'scheduler-workflow' + } + namespace: kubernetesNamespace + } + spec: { + replicas: 1 + selector: { + matchLabels: { + app: 'scheduler-workflow' + } + } + template: { + metadata: { + labels: { + app: 'scheduler-workflow' + } + annotations: { + 'dapr.io/enabled': 'true' + 'dapr.io/app-id': 'scheduler-workflow' + 'dapr.io/enable-profiling': 'true' + 'dapr.io/log-as-json': 'true' + 'prometheus.io/scrape': 'true' + 'prometheus.io/port': '9988' + } + } + spec: { + containers: [ + { + name: 'scheduler-workflow' + image: 'daprtests.azurecr.io/scheduler-workflow:dev' + ports: [ + { + containerPort: 3009 + } + { + name: 'prom' + containerPort: 9988 + } + ] + imagePullPolicy: 'Always' + } + ] + } + } + } +} diff --git a/deploy/containerapps/apps/scheduler-actor-reminders-client.bicep b/deploy/containerapps/apps/scheduler-actor-reminders-client.bicep new file mode 100644 index 00000000..e90772f6 --- /dev/null +++ b/deploy/containerapps/apps/scheduler-actor-reminders-client.bicep @@ -0,0 +1,39 @@ +param appName string = 'scheduler-actor-reminders-client' +param containerPort int = 3008 +param environmentName string +param location string + +resource environment 'Microsoft.App/managedEnvironments@2022-03-01' existing = { + name: environmentName +} + +resource schedulerActorRemindersClient 'Microsoft.App/containerApps@2022-03-01' = { + name: appName + location: location + properties: { + managedEnvironmentId: environment.id + template: { + containers: [ + { + name: appName + image: 'dapriotest/${appName}:dev' + } + ] + scale: { + minReplicas: 1 + maxReplicas: 1 + } + } + configuration: { + ingress: { + external: false + targetPort: containerPort + } + dapr: { + enabled: true + appId: appName + appPort: containerPort + } + } + } +} \ No newline at end of file diff --git a/deploy/containerapps/apps/scheduler-actor-reminders-server.bicep b/deploy/containerapps/apps/scheduler-actor-reminders-server.bicep new file mode 100644 index 00000000..1c67c66b --- /dev/null +++ b/deploy/containerapps/apps/scheduler-actor-reminders-server.bicep @@ -0,0 +1,39 @@ +param appName string = 'scheduler-actor-reminders-server' +param containerPort int = 3007 +param environmentName string +param location string + +resource environment 'Microsoft.App/managedEnvironments@2022-03-01' existing = { + name: environmentName +} + +resource schedulerActorRemindersServer 'Microsoft.App/containerApps@2022-03-01' = { + name: appName + location: location + properties: { + managedEnvironmentId: environment.id + template: { + containers: [ + { + name: appName + image: 'dapriotest/${appName}:dev' + } + ] + scale: { + minReplicas: 1 + maxReplicas: 1 + } + } + configuration: { + ingress: { + external: false + targetPort: containerPort + } + dapr: { + enabled: true + appId: appName + appPort: containerPort + } + } + } +} \ No newline at end of file diff --git a/deploy/containerapps/apps/scheduler-jobs.bicep b/deploy/containerapps/apps/scheduler-jobs.bicep new file mode 100644 index 00000000..9ce68d9d --- /dev/null +++ b/deploy/containerapps/apps/scheduler-jobs.bicep @@ -0,0 +1,39 @@ +param appName string = 'scheduler-jobs' +param containerPort int = 3006 +param environmentName string +param location string + +resource environment 'Microsoft.App/managedEnvironments@2022-03-01' existing = { + name: environmentName +} + +resource schedulerJobs 'Microsoft.App/containerApps@2022-03-01' = { + name: appName + location: location + properties: { + managedEnvironmentId: environment.id + template: { + containers: [ + { + name: appName + image: 'dapriotest/${appName}:dev' + } + ] + scale: { + minReplicas: 1 + maxReplicas: 1 + } + } + configuration: { + ingress: { + external: false + targetPort: containerPort + } + dapr: { + enabled: true + appId: appName + appPort: containerPort + } + } + } +} \ No newline at end of file diff --git a/deploy/containerapps/apps/scheduler-workflow.bicep b/deploy/containerapps/apps/scheduler-workflow.bicep new file mode 100644 index 00000000..481529ee --- /dev/null +++ b/deploy/containerapps/apps/scheduler-workflow.bicep @@ -0,0 +1,39 @@ +param appName string = 'scheduler-workflow' +param containerPort int = 3009 +param environmentName string +param location string + +resource environment 'Microsoft.App/managedEnvironments@2022-03-01' existing = { + name: environmentName +} + +resource schedulerWorkflow 'Microsoft.App/containerApps@2022-03-01' = { + name: appName + location: location + properties: { + managedEnvironmentId: environment.id + template: { + containers: [ + { + name: appName + image: 'dapriotest/${appName}:dev' + } + ] + scale: { + minReplicas: 1 + maxReplicas: 1 + } + } + configuration: { + ingress: { + external: false + targetPort: containerPort + } + dapr: { + enabled: true + appId: appName + appPort: containerPort + } + } + } +} \ No newline at end of file diff --git a/deploy/dapr-multi-app/dapr.yaml b/deploy/dapr-multi-app/dapr.yaml index 16d16843..a3649f3c 100644 --- a/deploy/dapr-multi-app/dapr.yaml +++ b/deploy/dapr-multi-app/dapr.yaml @@ -30,4 +30,24 @@ apps: - appID: pubsub-workflow appDirPath: ../../pubsub-workflow appPort: 3005 - command: ["dotnet","run", "--DaprHTTPAppPort=3005"] \ No newline at end of file + command: ["dotnet","run", "--DaprHTTPAppPort=3005"] + - appID: scheduler-jobs + appDirPath: ../../scheduler-jobs + appPort: 3006 + daprHTTPPort: 3008 + command: [ "go","run", "scheduler-jobs.go"] + - appID: scheduler-actor-reminders-server + appDirPath: ../../scheduler-actor-reminders/server + appPort: 3007 + daprHTTPPort: 3007 + command: [ "go","run", "player-actor.go"] + - appID: scheduler-actor-reminders-client + appDirPath: ../../scheduler-actor-reminders/client + appPort: 3008 + daprHTTPPort: 3008 + command: [ "go","run", "player-actor-client.go"] + - appID: scheduler-workflow + appDirPath: ../../scheduler-workflow + appPort: 3009 + daprHTTPPort: 3009 + command: [ "go","run", "scheduler-workflow.go"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index d02bc07f..738898ea 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -47,4 +47,28 @@ services: image: workflow-gen build: context: . - dockerfile: workflow-gen/Dockerfile \ No newline at end of file + dockerfile: workflow-gen/Dockerfile + + scheduler-jobs: + image: scheduler-jobs + build: + context: ./scheduler-jobs + dockerfile: Dockerfile + + scheduler-actor-reminders-server: + image: scheduler-actor-reminders-server + build: + context: ./scheduler-actor-reminders + dockerfile: Dockerfile-server + + scheduler-actor-reminders-client: + image: scheduler-actor-reminders-client + build: + context: ./scheduler-actor-reminders + dockerfile: Dockerfile-client + + scheduler-workflow: + image: scheduler-workflow + build: + context: ./scheduler-workflow + dockerfile: Dockerfile \ No newline at end of file diff --git a/longhaul-test/scheduler-actor-reminders-client-deploy.yml b/longhaul-test/scheduler-actor-reminders-client-deploy.yml new file mode 100644 index 00000000..e37e77aa --- /dev/null +++ b/longhaul-test/scheduler-actor-reminders-client-deploy.yml @@ -0,0 +1,54 @@ +# ------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------------------------------ + +kind: Service +apiVersion: v1 +metadata: + name: scheduler-actor-reminders-client + labels: + app: scheduler-actor-reminders-client +spec: + selector: + app: scheduler-actor-reminders-client + ports: + - protocol: TCP + port: 9988 + targetPort: 9988 + type: ClusterIP + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: scheduler-actor-reminders-client-app + labels: + app: scheduler-actor-reminders-client +spec: + replicas: 1 + selector: + matchLabels: + app: scheduler-actor-reminders-client + template: + metadata: + labels: + app: scheduler-actor-reminders-client + annotations: + dapr.io/enabled: "true" + dapr.io/app-id: "scheduler-actor-reminders-client" + dapr.io/enable-profiling: "true" + dapr.io/log-as-json: "true" + prometheus.io/scrape: 'true' + prometheus.io/port: '9988' + spec: + containers: + - name: scheduler-actor-reminders-client + image: daprtests.azurecr.io/scheduler-actor-reminders-client:dev + ports: + - name: dapr + containerPort: 3008 + - name: prom + containerPort: 9988 + + imagePullPolicy: Always diff --git a/longhaul-test/scheduler-actor-reminders-server-deploy.yml b/longhaul-test/scheduler-actor-reminders-server-deploy.yml new file mode 100644 index 00000000..76f9d75f --- /dev/null +++ b/longhaul-test/scheduler-actor-reminders-server-deploy.yml @@ -0,0 +1,54 @@ +# ------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------------------------------ + +kind: Service +apiVersion: v1 +metadata: + name: scheduler-actor-reminders-server + labels: + app: scheduler-actor-reminders-server +spec: + selector: + app: scheduler-actor-reminders-server + ports: + - protocol: TCP + port: 9988 + targetPort: 9988 + type: ClusterIP + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: scheduler-actor-reminders-server-app + labels: + app: scheduler-actor-reminders-server +spec: + replicas: 1 + selector: + matchLabels: + app: scheduler-actor-reminders-server + template: + metadata: + labels: + app: scheduler-actor-reminders-server + annotations: + dapr.io/enabled: "true" + dapr.io/app-id: "scheduler-actor-reminders-server" + dapr.io/enable-profiling: "true" + dapr.io/log-as-json: "true" + prometheus.io/scrape: 'true' + prometheus.io/port: '9988' + spec: + containers: + - name: scheduler-actor-reminders-server + image: daprtests.azurecr.io/scheduler-actor-reminders-server:dev + ports: + - name: dapr + containerPort: 3007 + - name: prom + containerPort: 9988 + + imagePullPolicy: Always diff --git a/longhaul-test/scheduler-jobs-deploy.yml b/longhaul-test/scheduler-jobs-deploy.yml new file mode 100644 index 00000000..b36f2c03 --- /dev/null +++ b/longhaul-test/scheduler-jobs-deploy.yml @@ -0,0 +1,54 @@ +# ------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------------------------------ + +kind: Service +apiVersion: v1 +metadata: + name: scheduler-jobs + labels: + app: scheduler-jobs +spec: + selector: + app: scheduler-jobs + ports: + - protocol: TCP + port: 9988 + targetPort: 9988 + type: ClusterIP + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: scheduler-jobs-app + labels: + app: scheduler-jobs +spec: + replicas: 1 + selector: + matchLabels: + app: scheduler-jobs + template: + metadata: + labels: + app: scheduler-jobs + annotations: + dapr.io/enabled: "true" + dapr.io/app-id: "scheduler-jobs" + dapr.io/enable-profiling: "true" + dapr.io/log-as-json: "true" + prometheus.io/scrape: 'true' + prometheus.io/port: '9988' + spec: + containers: + - name: scheduler-jobs + image: daprtests.azurecr.io/scheduler-jobs:dev + ports: + - name: dapr + containerPort: 3006 + - name: prom + containerPort: 9988 + + imagePullPolicy: Always diff --git a/longhaul-test/scheduler-workflow-deploy.yml b/longhaul-test/scheduler-workflow-deploy.yml new file mode 100644 index 00000000..bb28910b --- /dev/null +++ b/longhaul-test/scheduler-workflow-deploy.yml @@ -0,0 +1,54 @@ +# ------------------------------------------------------------ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# ------------------------------------------------------------ + +kind: Service +apiVersion: v1 +metadata: + name: scheduler-workflow + labels: + app: scheduler-workflow +spec: + selector: + app: scheduler-workflow + ports: + - protocol: TCP + port: 9988 + targetPort: 9988 + type: ClusterIP + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: scheduler-workflow-app + labels: + app: scheduler-workflow +spec: + replicas: 1 + selector: + matchLabels: + app: scheduler-workflow + template: + metadata: + labels: + app: scheduler-workflow + annotations: + dapr.io/enabled: "true" + dapr.io/app-id: "scheduler-workflow" + dapr.io/enable-profiling: "true" + dapr.io/log-as-json: "true" + prometheus.io/scrape: 'true' + prometheus.io/port: '9988' + spec: + containers: + - name: scheduler-workflow + image: daprtests.azurecr.io/scheduler-workflow:dev + ports: + - name: dapr + containerPort: 3009 + - name: prom + containerPort: 9988 + + imagePullPolicy: Always diff --git a/scheduler-actor-reminders/Dockerfile-client b/scheduler-actor-reminders/Dockerfile-client new file mode 100644 index 00000000..211c3f37 --- /dev/null +++ b/scheduler-actor-reminders/Dockerfile-client @@ -0,0 +1,23 @@ +FROM golang:1.23 as builder + +# Set the working directory inside the container +WORKDIR /client + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +# Build the client app binary +RUN CGO_ENABLED=0 go build -o player-actor-client ./client/player-actor-client.go + +# Final stage +FROM alpine:latest + +WORKDIR /client + +# Copy binary from the builder stage +COPY --from=builder /client/player-actor-client . + +# Start the client +CMD ["/client/player-actor-client"] diff --git a/scheduler-actor-reminders/Dockerfile-server b/scheduler-actor-reminders/Dockerfile-server new file mode 100644 index 00000000..2a71e46a --- /dev/null +++ b/scheduler-actor-reminders/Dockerfile-server @@ -0,0 +1,25 @@ +FROM golang:1.23 as builder + +# Set the working directory inside the container +WORKDIR /server + +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +# Build the server app binary +RUN CGO_ENABLED=0 go build -o player-actor ./server/player-actor.go + +# Final stage +FROM alpine:latest + +WORKDIR /server + +# Copy binary from the builder stage +COPY --from=builder /server/player-actor . + +EXPOSE 3007 + +# Start the server +CMD ["/server/player-actor"] diff --git a/scheduler-actor-reminders/README.md b/scheduler-actor-reminders/README.md new file mode 100644 index 00000000..8efc0fdd --- /dev/null +++ b/scheduler-actor-reminders/README.md @@ -0,0 +1,36 @@ +# Scheduler Actor Reminders + +## Overview +This project tests the Dapr Scheduler for handling Actor Reminders using an example `PlayerActor` actor. The example +simulates a game session where a player's health increases or decays periodically, managed through Dapr Actor reminders. + +## Project Structure +The `client` directory implements the client code that interacts with `PlayerActor`, specifically setting up reminders, +monitoring health, and handling shutdown. It invokes actor methods, but does not manage the actor lifecycle. + +The `server` directory implements the `PlayerActor` server code, which manages a game session for a player, including +health-based reminders. This code defines the actor lifecycle and its reminder-based state changes. + +## Reminders +Two reminders manage the actor's health: +- `healthReminder`: + - Increases the player's health if it is below full health. +- `healthDecayReminder`: + - Decreases the player's health periodically, simulating a natural decay over time. + +When the player's health reaches 0, the client unregisters the reminders, revives the player, and restarts the reminders. + +This tests the Scheduler for the underlying storage for Actor Reminders. + +## How To Run the Code: +Run the server with: +```shell +dapr run --app-id player-actor --app-port 3007 --dapr-http-port 3500 --log-level debug --config ../dapr/config.yaml -- go run player-actor.go +``` + +Run the client with: +```shell +dapr run --app-id player-actor --app-port 3008 --dapr-http-port 3501 --dapr-grpc-port 50001 --log-level debug --config ../dapr/config.yaml -- go run player-actor-client.go +``` + +Note the config is using `SchedulerReminders` \ No newline at end of file diff --git a/scheduler-actor-reminders/api/api.go b/scheduler-actor-reminders/api/api.go new file mode 100644 index 00000000..4871b691 --- /dev/null +++ b/scheduler-actor-reminders/api/api.go @@ -0,0 +1,96 @@ +package api + +import ( + "context" + "fmt" + + "github.com/dapr/go-sdk/actor" + dapr "github.com/dapr/go-sdk/client" + "github.com/dapr/go-sdk/examples/actor/api" +) + +const playerActorType = "playerActorType" + +type PlayerActor struct { + actor.ServerImplBaseCtx + DaprClient dapr.Client + Health int +} + +func (p *PlayerActor) Type() string { + return playerActorType +} + +type GetPlayerRequest struct { + ActorID string +} + +type GetPlayerResponse struct { + ActorID string + Health int +} + +// GetUser retrieving the state of the PlayerActor +func (p *PlayerActor) GetUser(ctx context.Context, player *GetPlayerRequest) (*GetPlayerResponse, error) { + if player.ActorID == p.ID() { + fmt.Printf("Player Actor ID: %s has a health level of: %d\n", p.ID(), p.Health) + return &GetPlayerResponse{ + ActorID: p.ID(), + Health: p.Health, + }, nil + } + return nil, nil +} + +// Invoke invokes an action on the actor +func (p *PlayerActor) Invoke(ctx context.Context, req string) (string, error) { + fmt.Println("get req = ", req) + return req, nil +} + +// RevivePlayer revives the actor players health back to 100 +func (p *PlayerActor) RevivePlayer(ctx context.Context, id string) error { + if id == p.ID() { + fmt.Printf("Reviving player: %s\n", id) + p.Health = 100 + } + + return nil +} + +// ReminderCall executes logic to handle what happens when the reminder is triggered +// Dapr automatically calls this method when a reminder fires for the player actor +func (p *PlayerActor) ReminderCall(reminderName string, state []byte, dueTime string, period string) { + fmt.Println("receive reminder = ", reminderName, " state = ", string(state), "duetime = ", dueTime, "period = ", period) + if reminderName == "healthReminder" { + // Increase health if below 100 + if p.Health < 100 { + p.Health += 10 + if p.Health > 100 { + p.Health = 100 + } + fmt.Printf("Player Actor health increased. Current health: %d\n", p.Health) + } + } else if reminderName == "healthDecayReminder" { + // Decrease health + p.Health -= 5 + if p.Health < 0 { + fmt.Println("Player Actor died...") + } + fmt.Printf("Health decreased. Current health: %d\n", p.Health) + } + +} + +// StartReminder registers a reminder for the actor +func (p *PlayerActor) StartReminder(ctx context.Context, req *api.ReminderRequest) error { + fmt.Println("Starting reminder:", req.ReminderName) + return p.DaprClient.RegisterActorReminder(ctx, &dapr.RegisterActorReminderRequest{ + ActorType: p.Type(), + ActorID: p.ID(), + Name: req.ReminderName, + DueTime: req.Duration, + Period: req.Period, + Data: []byte(req.Data), + }) +} diff --git a/scheduler-actor-reminders/client/player-actor-client.go b/scheduler-actor-reminders/client/player-actor-client.go new file mode 100644 index 00000000..26838814 --- /dev/null +++ b/scheduler-actor-reminders/client/player-actor-client.go @@ -0,0 +1,198 @@ +package main + +import ( + "context" + "encoding/json" + "log" + "os" + "os/signal" + "syscall" + "time" + + "test-infra/scheduler-actor-reminders/api" + + dapr "github.com/dapr/go-sdk/client" +) + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := dapr.NewClient() + if err != nil { + panic(err) + } + defer client.Close() + + // Actor ID for the player 'session' + actorID := "player-1" + deathSignal := make(chan bool) + + // Start monitoring actor player's health + go monitorPlayerHealth(ctx, client, actorID, deathSignal) + + incReminderCtx, incReminderCancel := context.WithTimeout(ctx, 5*time.Second) + defer incReminderCancel() + // Start player actor health increase reminder + err = client.RegisterActorReminder(incReminderCtx, &dapr.RegisterActorReminderRequest{ + ActorType: "playerActorType", + ActorID: actorID, + Name: "healthReminder", + DueTime: "10s", + Period: "20s", // Every 20 seconds, increase health + Data: []byte(`"Health increase reminder"`), + }) + if err != nil { + log.Printf("error starting health increase reminder: %v", err) + } + log.Println("Started healthReminder for actor:", actorID) + + decReminderCtx, decReminderCancel := context.WithTimeout(ctx, 5*time.Second) + defer decReminderCancel() + // Start player actor health decay reminder + err = client.RegisterActorReminder(decReminderCtx, &dapr.RegisterActorReminderRequest{ + ActorType: "playerActorType", + ActorID: actorID, + Name: "healthDecayReminder", + DueTime: "0s", + Period: "2s", // Every 2 seconds, decay health + Data: []byte(`"Health decay reminder"`), + }) + if err != nil { + log.Printf("failed to start health decay reminder: %w", err) + } + + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-deathSignal: + log.Println("Player is dead. Unregistering reminders...") + + log.Println("Unregistering health increase reminder for actor...") + unregIncReminderCtx, unregIncReminderCancel := context.WithTimeout(ctx, 5*time.Second) + err = client.UnregisterActorReminder(unregIncReminderCtx, &dapr.UnregisterActorReminderRequest{ + ActorType: "playerActorType", + ActorID: actorID, + Name: "healthReminder", + }) + unregIncReminderCancel() + if err != nil { + log.Printf("error unregistering actor reminder: %v", err) + } + + log.Println("Unregistering health decay reminder for actor...") + unregDecReminderCtx, unregDecReminderCancel := context.WithTimeout(ctx, 5*time.Second) + err = client.UnregisterActorReminder(unregDecReminderCtx, &dapr.UnregisterActorReminderRequest{ + ActorType: "playerActorType", + ActorID: actorID, + Name: "healthDecayReminder", + }) + unregDecReminderCancel() + if err != nil { + log.Printf("error unregistering actor reminder: %v", err) + } + + log.Println("Player reminders unregistered. Reviving player...") + req := &dapr.InvokeActorRequest{ + ActorType: "playerActorType", + ActorID: actorID, + Method: "RevivePlayer", + Data: []byte(`"player-1"`), + } + invokeCtx, invokeCancel := context.WithTimeout(ctx, 5*time.Second) + _, err = client.InvokeActor(invokeCtx, req) + invokeCancel() + if err != nil { + log.Printf("error invoking actor method RevivePlayer: %v", err) + } + log.Println("Player revived, health reset to 100. Restarting reminders...") + + incRemCtx, incRemCancel := context.WithTimeout(ctx, 5*time.Second) + // Restart reminders + err = client.RegisterActorReminder(incRemCtx, &dapr.RegisterActorReminderRequest{ + ActorType: "playerActorType", + ActorID: actorID, + Name: "healthReminder", + DueTime: "10s", + Period: "20s", + Data: []byte(`"Health increase reminder"`), + }) + incRemCancel() + if err != nil { + log.Printf("error starting actor reminder: %v", err) + } + log.Println("Started health increase reminder for actor:", actorID) + decRemCtx, decRemCancel := context.WithTimeout(ctx, 5*time.Second) + err = client.RegisterActorReminder(decRemCtx, &dapr.RegisterActorReminderRequest{ + ActorType: "playerActorType", + ActorID: actorID, + Name: "healthDecayReminder", + DueTime: "0s", + Period: "2s", // Every 5 seconds, decay health + Data: []byte(`"Health decay reminder"`), + }) + decRemCancel() + if err != nil { + log.Printf("error starting health decay reminder: %v", err) + } + log.Println("Started health decay reminder for actor:", actorID) + } + } + }(ctx) + + // Graceful shutdown on Ctrl+C or SIGTERM (for Docker/K8s graceful shutdown) + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM) + <-signalChan + log.Println("Shutting down...") +} + +// monitorPlayerHealth continuously checks the player's health every 5 seconds +// and signals via a channel if the player is dead (health <= 0). +func monitorPlayerHealth(ctx context.Context, client dapr.Client, actorID string, deathSignal chan bool) { + for { + select { + case <-ctx.Done(): + return + default: + // Check actor player's health + getPlayerRequest := &api.GetPlayerRequest{ActorID: actorID} + requestData, err := json.Marshal(getPlayerRequest) + if err != nil { + log.Printf("error marshaling request data: %v", err) + } + + req := &dapr.InvokeActorRequest{ + ActorType: "playerActorType", + ActorID: actorID, + Method: "GetUser", + Data: requestData, + } + invokeCtx, invokeCancel := context.WithTimeout(ctx, 5*time.Second) + resp, err := client.InvokeActor(invokeCtx, req) + invokeCancel() + if err != nil { + log.Printf("error invoking actor method GetUser: %v", err) + } + + playerResp := &api.GetPlayerResponse{} + err = json.Unmarshal(resp.Data, playerResp) + if err != nil { + log.Printf("error unmarshaling player state: %v", err) + } + log.Printf("Player health: %v\n", playerResp.Health) + + // If health is zero or below, signal player death + if playerResp.Health <= 0 { + deathSignal <- true + } else { + log.Printf("Player is alive with health: %d\n", playerResp.Health) + } + + // Sleep for 5 seconds before checking health again + time.Sleep(5 * time.Second) + } + } +} diff --git a/scheduler-actor-reminders/dapr/config.yaml b/scheduler-actor-reminders/dapr/config.yaml new file mode 100644 index 00000000..3c0de60f --- /dev/null +++ b/scheduler-actor-reminders/dapr/config.yaml @@ -0,0 +1,8 @@ +apiVersion: dapr.io/v1alpha1 +kind: Configuration +metadata: + name: schedulerreminders +spec: + features: + - name: SchedulerReminders + enabled: true \ No newline at end of file diff --git a/scheduler-actor-reminders/go.mod b/scheduler-actor-reminders/go.mod new file mode 100644 index 00000000..e958c928 --- /dev/null +++ b/scheduler-actor-reminders/go.mod @@ -0,0 +1,23 @@ +module test-infra/scheduler-actor-reminders + +go 1.23.1 + +require ( + github.com/dapr/go-sdk v1.11.0 + github.com/dapr/go-sdk/examples/actor v0.0.0-20240626135542-c417f950fe1d +) + +require ( + github.com/dapr/dapr v1.14.0 // indirect + github.com/go-chi/chi/v5 v5.1.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect + google.golang.org/grpc v1.65.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/scheduler-actor-reminders/go.sum b/scheduler-actor-reminders/go.sum new file mode 100644 index 00000000..53e70fb9 --- /dev/null +++ b/scheduler-actor-reminders/go.sum @@ -0,0 +1,47 @@ +github.com/dapr/dapr v1.14.0 h1:SIQsNX1kH31JRDIS4k8IZ6eomM/BAcOP844PhQIT+BQ= +github.com/dapr/dapr v1.14.0/go.mod h1:oDNgaPHQIDZ3G4n4g89TElXWgkluYwcar41DI/oF4gw= +github.com/dapr/go-sdk v1.11.0 h1:clANpOQd6MsfvSa6snaX8MVk6eRx26Vsj5GxGdQ6mpE= +github.com/dapr/go-sdk v1.11.0/go.mod h1:btZ/tX8eYnx0fg3HiJUku8J5QBRXHsp3kAB1BUiTxXY= +github.com/dapr/go-sdk/examples/actor v0.0.0-20240626135542-c417f950fe1d h1:J7u+R5/FXuh1y757FHAnA75g3w8ADzm4idO1kQmKH80= +github.com/dapr/go-sdk/examples/actor v0.0.0-20240626135542-c417f950fe1d/go.mod h1:Vg/MQZ6O3JVTtp6NgNtAGRmyCamXjZDp6OBqkLfF8W8= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= +github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:k3zyW3BYYR30e8v3x0bTDdE9vpYFjZHK+HcyqkrppWk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/scheduler-actor-reminders/server/player-actor.go b/scheduler-actor-reminders/server/player-actor.go new file mode 100644 index 00000000..52869686 --- /dev/null +++ b/scheduler-actor-reminders/server/player-actor.go @@ -0,0 +1,62 @@ +package main + +import ( + "context" + "log" + http2 "net/http" + "os" + "os/signal" + "syscall" + + "test-infra/scheduler-actor-reminders/api" + + "github.com/dapr/go-sdk/actor" + dapr "github.com/dapr/go-sdk/client" + "github.com/dapr/go-sdk/service/http" +) + +const appPort = ":3007" + +func playerActorFactory() actor.ServerContext { + client, err := dapr.NewClient() + if err != nil { + panic(err) + } + + return &api.PlayerActor{ + DaprClient: client, + Health: 100, // initial health + } +} + +func main() { + _, cancel := context.WithCancel(context.Background()) + defer cancel() + + daprService := http.NewService(appPort) + // Register actor factory, meaning register actor methods to be called by client + daprService.RegisterActorImplFactoryContext(playerActorFactory) + + go func() { + log.Println("Starting Dapr actor runtime...") + if err := daprService.Start(); err != nil && err.Error() != http2.ErrServerClosed.Error() { + log.Fatalf("error starting Dapr actor runtime: %v", err) + } + }() + + waitForShutdown(cancel) + if err := daprService.GracefulStop(); err != nil { + log.Fatalf("error stopping Dapr actor runtime: %v", err) + } +} + +// waitForShutdown keeps the app alive until an interrupt or termination signal is received +func waitForShutdown(cancelFunc context.CancelFunc) { + sigCh := make(chan os.Signal, 1) + // Notify the channel on Interrupt (Ctrl+C) or SIGTERM (for Docker/K8s graceful shutdown) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + <-sigCh + + log.Println("Shutting down...") + cancelFunc() +} diff --git a/scheduler-jobs/Dockerfile b/scheduler-jobs/Dockerfile new file mode 100644 index 00000000..4b735471 --- /dev/null +++ b/scheduler-jobs/Dockerfile @@ -0,0 +1,28 @@ +FROM golang:1.23 as builder + +# Working directory inside the container +WORKDIR /scheduler-jobs + +COPY go.mod go.sum ./ +RUN go mod download + +# Copy the app code +COPY . . + +# Build the app +RUN CGO_ENABLED=0 GOOS=linux go build -o scheduler-jobs . + +# Initialize a new build stage. Use a minimal base image for the final container +FROM alpine:latest + +# Install certificates for HTTPS support +RUN apk --no-cache add ca-certificates + +# Copy the binary from the builder image -> final image +COPY --from=builder /scheduler-jobs/scheduler-jobs /usr/local/bin/scheduler-jobs + +# Port the app will listen on +EXPOSE 3006 + +# Entrypoint to run the app +ENTRYPOINT ["scheduler-jobs"] diff --git a/scheduler-jobs/README.md b/scheduler-jobs/README.md new file mode 100644 index 00000000..a3549b93 --- /dev/null +++ b/scheduler-jobs/README.md @@ -0,0 +1,20 @@ +# Scheduler Jobs + +## Overview + +- schedule 100 oneshot jobs indefinitely (repeat = 1) +- schedule 100 indefinite jobs indefinitely (repeat not set, trigger every 30s) +- schedule repeat-job job indefinitely (repeat = 5, trigger every 1s due immediately) +- indefinitely schedule and delete a create-delete-job job (repeat = 1, trigger every 1s) + +## How-To Run Locally: + +Run with dapr: +```shell +dapr run \ + --app-id scheduler-jobs \ + --app-port 3006 \ + --dapr-grpc-port 3501 --app-protocol grpc \ + --dapr-http-port 3500 --scheduler-host-address=127.0.0.1:50006 --app-channel-address=127.0.0.1 \ + -- go run scheduler-jobs.go +``` \ No newline at end of file diff --git a/scheduler-jobs/go.mod b/scheduler-jobs/go.mod new file mode 100644 index 00000000..dca9f9fc --- /dev/null +++ b/scheduler-jobs/go.mod @@ -0,0 +1,21 @@ +module test-infra/scheduler-jobs + +go 1.23.1 + +require ( + github.com/dapr/dapr v1.14.4 + github.com/dapr/go-sdk v1.11.0 + google.golang.org/grpc v1.67.1 +) + +require ( + github.com/google/uuid v1.6.0 // indirect + github.com/kr/pretty v0.3.1 // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect + google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/scheduler-jobs/go.sum b/scheduler-jobs/go.sum new file mode 100644 index 00000000..6c1df8f9 --- /dev/null +++ b/scheduler-jobs/go.sum @@ -0,0 +1,43 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/dapr/dapr v1.14.4 h1:OEyAFvpW4AK+MWyX5xp9zVLiC+oY/5ziDzh16F+aoWU= +github.com/dapr/dapr v1.14.4/go.mod h1:pZGJvUGT4IDhEuWVXBvJ73TkPnLAW34SDagth7G3f8w= +github.com/dapr/go-sdk v1.11.0 h1:clANpOQd6MsfvSa6snaX8MVk6eRx26Vsj5GxGdQ6mpE= +github.com/dapr/go-sdk v1.11.0/go.mod h1:btZ/tX8eYnx0fg3HiJUku8J5QBRXHsp3kAB1BUiTxXY= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/scheduler-jobs/scheduler-jobs.go b/scheduler-jobs/scheduler-jobs.go new file mode 100644 index 00000000..77c6844b --- /dev/null +++ b/scheduler-jobs/scheduler-jobs.go @@ -0,0 +1,344 @@ +package main + +import ( + "context" + "fmt" + "log" + "net" + "os" + "os/signal" + "strconv" + "strings" + "sync/atomic" + "syscall" + "time" + + "google.golang.org/grpc" + + rtv1 "github.com/dapr/dapr/pkg/proto/runtime/v1" + "github.com/dapr/go-sdk/client" +) + +const appPort = ":3006" + +var oneshot atomic.Int64 +var indefinite atomic.Int64 +var repeat atomic.Int64 +var createDelete atomic.Int64 + +const maxOneshotTriggerCount = 100 +const maxIndefiniteTriggerCount = 100 +const maxRepeatTriggerCount = 5 +const maxCreateDeleteTriggerCount = 1 + +// Channel to trigger scheduling new jobs after reaching the max count +var oneshotDoneCh = make(chan struct{}, 1) +var indefiniteDoneCh = make(chan struct{}, 1) +var repeatDoneCh = make(chan struct{}, 1) +var receivedSingleJobDoneCh = make(chan struct{}, 1) +var createDeleteDoneCh = make(chan struct{}, 1) + +type appServer struct { + client client.Client +} + +// startServer starts a gRPC server for receiving callbacks +func startServer(ctx context.Context) error { + lis, err := net.Listen("tcp", appPort) + if err != nil { + return fmt.Errorf("failed to listen on %v: %w", appPort, err) + } + + s := grpc.NewServer() + rtv1.RegisterAppCallbackAlphaServer(s, &appServer{}) + + errCh := make(chan error, 1) + + go func() { + log.Printf("Starting gRPC server on port %s...\n", appPort) + if err := s.Serve(lis); err != nil { + errCh <- fmt.Errorf("failed to serve: %w", err) + } + }() + + select { + case <-ctx.Done(): + log.Println("Context canceled, shutting down gRPC server...") + s.GracefulStop() + return nil + case err := <-errCh: + return err + } +} + +func (s *appServer) OnBulkTopicEventAlpha1(ctx context.Context, in *rtv1.TopicEventBulkRequest) (*rtv1.TopicEventBulkResponse, error) { + return nil, nil +} + +func (s *appServer) OnJobEventAlpha1(ctx context.Context, in *rtv1.JobEventRequest) (*rtv1.JobEventResponse, error) { + if strings.HasPrefix(in.GetMethod(), "job/") { + if strings.Contains(in.GetMethod(), "oneshot") { + count := oneshot.Add(1) + if count == maxOneshotTriggerCount { + // Reset the oneshot counter + oneshot.Store(0) + log.Println("Reached max oneshot job count, scheduling new jobs...") + // Send signal to start scheduling another batch of one shot jobs + oneshotDoneCh <- struct{}{} + } + } else if strings.Contains(in.GetMethod(), "indefinite") { + count := indefinite.Add(1) + if count == maxIndefiniteTriggerCount { + // Reset the oneshot counter + indefinite.Store(0) + log.Println("Reached max indefinite job count, scheduling new jobs...") + // Send signal to start scheduling another batch of indefinite jobs + indefiniteDoneCh <- struct{}{} + } + } else if strings.Contains(in.GetMethod(), "repeat-job") { + count := repeat.Add(1) + if count == maxRepeatTriggerCount { + // Reset the repeat counter + repeat.Store(0) + log.Println("Reached max repeat job count, scheduling new jobs...") + // Send signal to start scheduling another repeat job + repeatDoneCh <- struct{}{} + } + } else if strings.Contains(in.GetMethod(), "create-delete-job") { + count := createDelete.Add(1) + log.Printf("create-delete-job triggered count: %d\n", count) + + if count == maxCreateDeleteTriggerCount { + log.Println("Received the single create-delete-job, deleting it...") + // Send signal to delete the job + receivedSingleJobDoneCh <- struct{}{} + } else { + log.Printf("Received too many single repeat, create-delete-job jobs. Count: %d...\n", createDelete.Load()) + } + } + } + return nil, nil +} + +func scheduleOneshotJobs(ctx context.Context, daprClient client.Client) { + for i := 0; i < 100; i++ { + select { + case <-ctx.Done(): + log.Println("context canceled, stopping scheduleOneshotJobs.") + return + default: + } + jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + name := "oneshot-job-" + strconv.Itoa(i) + req := &client.Job{ + Name: name, + Schedule: "@every 30s", + Repeats: 1, // one shot job + DueTime: "5s", + TTL: "40s", + Data: nil, + } + err := daprClient.ScheduleJobAlpha1(jobCtx, req) + cancel() + if err != nil { + log.Printf("Error scheduling oneshot job '%s': %s\n", name, err) + } + } +} + +func scheduleIndefiniteJobs(ctx context.Context, daprClient client.Client) { + for i := 0; i < 100; i++ { + select { + case <-ctx.Done(): + log.Println("context canceled, stopping scheduleOneshotJobs.") + return + default: + } + jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + name := "indefinite-job-" + strconv.Itoa(i) + req := &client.Job{ + Name: name, + Schedule: "@every 30s", + DueTime: "1s", + TTL: "40s", + Data: nil, + } + err := daprClient.ScheduleJobAlpha1(jobCtx, req) + cancel() + if err != nil { + log.Printf("Error scheduling indefinite job '%s': %s\n", name, err) + } + } +} + +func scheduleRepeatedJob(ctx context.Context, daprClient client.Client) { + name := "repeat-job" + req := &client.Job{ + Name: name, + Schedule: "@every 1s", + DueTime: "0s", + Repeats: maxRepeatTriggerCount, + TTL: "10s", + Data: nil, + } + jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + err := daprClient.ScheduleJobAlpha1(jobCtx, req) + if err != nil { + log.Printf("Error scheduling repeat job '%s': %s\n", name, err) + } +} + +func scheduleSingleJob(ctx context.Context, daprClient client.Client) { + name := "create-delete-job" + req := &client.Job{ + Name: name, + Schedule: "@every 1s", + DueTime: "1s", + Repeats: maxCreateDeleteTriggerCount, + TTL: "3s", + Data: nil, + } + jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + err := daprClient.ScheduleJobAlpha1(jobCtx, req) + if err != nil { + log.Printf("Error scheduling single job '%s': %s\n", name, err) + } +} + +func deleteSingleJob(ctx context.Context, daprClient client.Client) { + name := "create-delete-job" + jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + err := daprClient.DeleteJobAlpha1(jobCtx, name) + if err != nil { + log.Printf("Error deleting single job '%s': %s\n", name, err) + } + createDelete.Store(0) + // signal to start the create/delete process again after waiting 2 minutes + createDeleteDoneCh <- struct{}{} +} + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func(ctx context.Context) { + if err := startServer(ctx); err != nil { + log.Fatalf("Error starting server: %v", err) + } + }(ctx) + + daprClient, err := client.NewClient() + if err != nil { + log.Fatalf("Error getting dapr client: %v", err) + } + defer daprClient.Close() + + // allow time for sidecar to connect otherwise if the job is scheduled beforehand then it gets dropped on the + // scheduler side bc there's no connection to send the job back on + log.Println("waiting a few seconds to let connections establish") + time.Sleep(5 * time.Second) + + // Schedule initial batch of jobs + go scheduleOneshotJobs(ctx, daprClient) + go scheduleIndefiniteJobs(ctx, daprClient) + go scheduleRepeatedJob(ctx, daprClient) + + // Schedule additional oneshot jobs once 100 are triggered + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + log.Println("context canceled, stopping oneshot scheduling goroutine.") + return + case <-oneshotDoneCh: + log.Println("Received input that maxOneshotTriggerCount was reached. Sleeping...") + time.Sleep(10 * time.Second) + log.Println("Scheduling next batch of oneshot jobs...") + go scheduleOneshotJobs(ctx, daprClient) + } + } + }(ctx) + + // Schedule additional indefinite jobs once 100 are triggered + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + log.Println("context canceled, stopping indefinite scheduling goroutine.") + return + case <-indefiniteDoneCh: + log.Println("Received input that maxIndefiniteTriggerCount was reached. Sleeping...") + time.Sleep(10 * time.Second) + log.Println("Scheduling next batch of indefinite jobs...") + go scheduleIndefiniteJobs(ctx, daprClient) + } + } + }(ctx) + + // Schedule job to trigger immediately every second for 1s for 5 times max (repeats) + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-repeatDoneCh: + log.Println("Received input that maxRepeatTriggerCount was reached. Sleeping...") + time.Sleep(60 * time.Second) + log.Println("Scheduling next repeated job...") + go scheduleRepeatedJob(ctx, daprClient) + } + } + }(ctx) + + // Handle receivedSingleJobDoneCh, this handles the scheduled job from the next go routine + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-receivedSingleJobDoneCh: + log.Println("Received input that the create-delete-job triggered, now deleting the job...") + // received triggered job, now delete it & set atomic int to 0 + deleteSingleJob(ctx, daprClient) + log.Println("Successfully deleted create-delete-job.") + } + } + }(ctx) + + go scheduleSingleJob(ctx, daprClient) + + // Reschedule the create-delete job after deletion, ensure triggers once + go func(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-createDeleteDoneCh: + log.Println("Received input that the create-delete-job was deleted. Sleeping for 5 seconds...") + time.Sleep(5 * time.Second) + log.Println("Scheduling create-delete-job...") + scheduleSingleJob(ctx, daprClient) + log.Println("Successfully scheduled create-delete-job.") + } + } + }(ctx) + + // Block until ctrl-c or sigterm + waitForShutdown(cancel) +} + +// waitForShutdown keeps the app alive until an interrupt or termination signal is received +func waitForShutdown(cancelFunc context.CancelFunc) { + sigCh := make(chan os.Signal, 1) + // Notify the channel on Interrupt (Ctrl+C) or SIGTERM (for Docker/K8s graceful shutdown) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + + // Block until we receive a signal + <-sigCh + + log.Println("Shutting down...") + cancelFunc() +} diff --git a/scheduler-workflow/Dockerfile b/scheduler-workflow/Dockerfile new file mode 100644 index 00000000..46ee0593 --- /dev/null +++ b/scheduler-workflow/Dockerfile @@ -0,0 +1,28 @@ +FROM golang:1.23 as builder + +# Working directory inside the container +WORKDIR /scheduler-workflow + +COPY go.mod go.sum ./ +RUN go mod download + +# Copy the app code +COPY . . + +# Build the app +RUN CGO_ENABLED=0 GOOS=linux go build -o scheduler-workflow . + +# Initialize a new build stage. Use a minimal base image for the final container +FROM alpine:latest + +# Install certificates for HTTPS support +RUN apk --no-cache add ca-certificates + +# Copy the binary from the builder image -> final image +COPY --from=builder /scheduler-workflow/scheduler-workflow /usr/local/bin/scheduler-workflow + +# Port the app will listen on +EXPOSE 3009 + +# Entrypoint to run the app +ENTRYPOINT ["scheduler-workflow"] diff --git a/scheduler-workflow/README.md b/scheduler-workflow/README.md new file mode 100644 index 00000000..a357b3ef --- /dev/null +++ b/scheduler-workflow/README.md @@ -0,0 +1,39 @@ +# Scheduler Workflow + +## Overview + +The long-haul test workflow performs the following operations repeatedly: + +1. Initialize and Register Workflows & Activities: + - A worker is set up and registers the `TestWorkflow` and `TestActivity`, which will execute in each iteration of the + long-haul test. + +2. Start Workflow Iterations: + - In each iteration, a new workflow instance is started with an incremental identifier (e.g., `longhaul-instance-0`, + `longhaul-instance-1`, etc.). + +3. Workflow Lifecycle Management: + - The workflow is paused and then resumed, simulating real-world scenarios where workflows may need to be temporarily + halted and restarted. + - An external event, named `testEvent`, is raised, allowing the workflow to proceed to the next stage upon receiving + the event. + +4. Stage Tracking: + - Within `TestActivity`, a `stage` variable is incremented to track the current step of the workflow. This allows us + to observe the workflow's progress and simulate step-based processing. Once stage reaches `100`, it resets to `0`. + +5. Workflow Completion & Cleanup: + - The test monitors each workflow’s status, polling every 5s, and waits until the workflow completes or fails. + - Upon completion (or failure), the workflow is: + - Terminated to ensure it doesn’t continue running or consuming resources. + - Purged to remove the instance from the Dapr state, maintaining a clean test environment over time. + +6. Iteration Limit: + - After `100` iterations, the workflow `instanceID` counter resets to `0` to avoid excessive resource buildup. + +## How To Run the Code: + +Run the server with: +```shell +dapr run --app-id scheduler-workflow --app-port 3009 --dapr-http-port 3502 --log-level debug --config dapr/config.yaml -- go run workflow.go +``` \ No newline at end of file diff --git a/scheduler-workflow/dapr/config.yaml b/scheduler-workflow/dapr/config.yaml new file mode 100644 index 00000000..3c0de60f --- /dev/null +++ b/scheduler-workflow/dapr/config.yaml @@ -0,0 +1,8 @@ +apiVersion: dapr.io/v1alpha1 +kind: Configuration +metadata: + name: schedulerreminders +spec: + features: + - name: SchedulerReminders + enabled: true \ No newline at end of file diff --git a/scheduler-workflow/go.mod b/scheduler-workflow/go.mod new file mode 100644 index 00000000..47482637 --- /dev/null +++ b/scheduler-workflow/go.mod @@ -0,0 +1,28 @@ +module test-infra/scheduler-workflow + +go 1.23.1 + +require github.com/dapr/go-sdk v1.11.0 + +require ( + github.com/cenkalti/backoff/v4 v4.3.0 // indirect + github.com/dapr/dapr v1.14.0 // indirect + github.com/go-chi/chi/v5 v5.1.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/protobuf v1.5.4 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/marusama/semaphore/v2 v2.5.0 // indirect + github.com/microsoft/durabletask-go v0.5.0 // indirect + go.opentelemetry.io/otel v1.27.0 // indirect + go.opentelemetry.io/otel/metric v1.27.0 // indirect + go.opentelemetry.io/otel/trace v1.27.0 // indirect + golang.org/x/net v0.26.0 // indirect + golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d // indirect + google.golang.org/grpc v1.65.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/scheduler-workflow/go.sum b/scheduler-workflow/go.sum new file mode 100644 index 00000000..d148b725 --- /dev/null +++ b/scheduler-workflow/go.sum @@ -0,0 +1,64 @@ +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/dapr/dapr v1.14.0 h1:SIQsNX1kH31JRDIS4k8IZ6eomM/BAcOP844PhQIT+BQ= +github.com/dapr/dapr v1.14.0/go.mod h1:oDNgaPHQIDZ3G4n4g89TElXWgkluYwcar41DI/oF4gw= +github.com/dapr/go-sdk v1.11.0 h1:clANpOQd6MsfvSa6snaX8MVk6eRx26Vsj5GxGdQ6mpE= +github.com/dapr/go-sdk v1.11.0/go.mod h1:btZ/tX8eYnx0fg3HiJUku8J5QBRXHsp3kAB1BUiTxXY= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= +github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/marusama/semaphore/v2 v2.5.0 h1:o/1QJD9DBYOWRnDhPwDVAXQn6mQYD0gZaS1Tpx6DJGM= +github.com/marusama/semaphore/v2 v2.5.0/go.mod h1:z9nMiNUekt/LTpTUQdpp+4sJeYqUGpwMHfW0Z8V8fnQ= +github.com/microsoft/durabletask-go v0.5.0 h1:4DWBgg05wnkV/VwakaiPqZ4cARvATP74ZQJFcXVMC18= +github.com/microsoft/durabletask-go v0.5.0/go.mod h1:goe2gmMgLptCijMDQ7JsekaR86KjPUG64V9JDXvKBhE= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg= +go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ= +go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik= +go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak= +go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw= +go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8 h1:yixxcjnhBmY0nkL253HFVIm0JsFHwrHdT3Yh6szTnfY= +golang.org/x/exp v0.0.0-20240613232115-7f521ea00fb8/go.mod h1:jj3sYF3dwk5D+ghuXyeI3r5MFf+NT2An6/9dOA95KSI= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d h1:k3zyW3BYYR30e8v3x0bTDdE9vpYFjZHK+HcyqkrppWk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY= +google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc= +google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/scheduler-workflow/workflow.go b/scheduler-workflow/workflow.go new file mode 100644 index 00000000..23f22020 --- /dev/null +++ b/scheduler-workflow/workflow.go @@ -0,0 +1,238 @@ +package main + +import ( + "context" + "fmt" + "log" + http2 "net/http" + "os" + "os/signal" + "sync/atomic" + "syscall" + "time" + + dapr "github.com/dapr/go-sdk/client" + "github.com/dapr/go-sdk/service/common" + "github.com/dapr/go-sdk/service/http" + "github.com/dapr/go-sdk/workflow" +) + +const appPort = ":3009" + +var stage atomic.Int64 + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := dapr.NewClient() + if err != nil { + log.Fatalf("Error getting dapr client: %v", err) + } + defer client.Close() + + daprService := http.NewService(appPort) + go func() { + log.Println("Starting app service...") + if err := daprService.Start(); err != nil && err.Error() != http2.ErrServerClosed.Error() { + log.Fatalf("error starting app service: %v", err) + } + }() + + worker, err := workflow.NewWorker() + if err != nil { + log.Fatal(err) + } + log.Println("Workflow worker initialized") + + if err := worker.RegisterWorkflow(TestWorkflow); err != nil { + log.Printf("Failed to register workflow: %v", err) + } + if err := worker.RegisterActivity(TestActivity); err != nil { + log.Printf("Failed to register activity: %v", err) + } + + // Start workflow runner + if err := worker.Start(); err != nil { + log.Printf("Failed to start worker: %v", err) + } + log.Println("Workflow worker started") + + go startLonghaulWorkflow(ctx, client) + waitForShutdown(daprService, worker, cancel) +} + +func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return nil, err + } + var output string + if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil { + return nil, err + } + + err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output) + if err != nil { + return nil, err + } + + if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil { + return nil, err + } + + return output, nil +} + +func TestActivity(ctx workflow.ActivityContext) (any, error) { + var input int + if err := ctx.GetInput(&input); err != nil { + return "", err + } + + if stage.Load() >= 100 { + stage.Store(0) + } + + stage.Add(1) + return fmt.Sprintf("Stage: %d", stage.Load()), nil +} + +// startLonghaulWorkflow performs the following operations on a workflow: +// start, pause, resume, raise event, terminate, purge +func startLonghaulWorkflow(ctx context.Context, client dapr.Client) { + i := 0 + for { + select { + case <-ctx.Done(): + return + default: + log.Printf("Starting workflow iteration %d\n", i) + instanceID := fmt.Sprintf("longhaul-instance-%d", i) + workflowReq := &dapr.StartWorkflowRequest{ + InstanceID: instanceID, + WorkflowComponent: "", + WorkflowName: "TestWorkflow", + Input: i, + SendRawInput: false, + } + + startWfCtx, startWfCancel := context.WithTimeout(ctx, 5*time.Second) + respStart, err := client.StartWorkflowBeta1(startWfCtx, workflowReq) + startWfCancel() + if err != nil { + log.Printf("Iteration %d: Failed to start workflow: %v\n", i, err) + continue + } + log.Printf("Workflow started with ID: '%s'\n", respStart.InstanceID) + + pauseWfCtx, pauseWfCancel := context.WithTimeout(ctx, 5*time.Second) + err = client.PauseWorkflowBeta1(pauseWfCtx, &dapr.PauseWorkflowRequest{ + InstanceID: instanceID, + WorkflowComponent: "", + }) + pauseWfCancel() + if err != nil { + log.Printf("Failed to pause workflow: %v\n", err) + } + log.Printf("Workflow '%s' paused\n", instanceID) + resumeWfCtx, resumeWfCancel := context.WithTimeout(ctx, 5*time.Second) + err = client.ResumeWorkflowBeta1(resumeWfCtx, &dapr.ResumeWorkflowRequest{ + InstanceID: instanceID, + WorkflowComponent: "", + }) + resumeWfCancel() + if err != nil { + log.Printf("Failed to resume workflow: %v\n", err) + } + log.Printf("Workflow '%s' resumed\n", instanceID) + raiseEventWfCtx, raiseEventWfCancel := context.WithTimeout(ctx, 5*time.Second) + // Raise event to advance the workflow + err = client.RaiseEventWorkflowBeta1(raiseEventWfCtx, &dapr.RaiseEventWorkflowRequest{ + InstanceID: instanceID, + WorkflowComponent: "", + EventName: "testEvent", + EventData: "testData", + }) + raiseEventWfCancel() + if err != nil { + log.Printf("Failed to raise event: %v\n", err) + } + log.Printf("Workflow '%s' event raised\n", instanceID) + log.Printf("[wfclient] stage: %d\n", stage.Load()) + + // Wait for workflow to complete + // Poll every 5 seconds to check the workflow status + waitForWorkflowCompletion(ctx, client, instanceID) + terminateWfCtx, terminateWfCancel := context.WithTimeout(ctx, 5*time.Second) + // Terminate and purge after completion + err = client.TerminateWorkflowBeta1(terminateWfCtx, &dapr.TerminateWorkflowRequest{ + InstanceID: instanceID, + }) + terminateWfCancel() + if err != nil { + log.Printf("Failed to terminate workflow %s: %v\n", instanceID, err) + } else { + log.Printf("Workflow '%s' terminated\n", instanceID) + } + purgeWfCtx, purgeWfCancel := context.WithTimeout(ctx, 5*time.Second) + err = client.PurgeWorkflowBeta1(purgeWfCtx, &dapr.PurgeWorkflowRequest{ + InstanceID: instanceID, + }) + purgeWfCancel() + if err != nil { + log.Printf("Failed to purge workflow %s: %v\n", instanceID, err) + } else { + log.Printf("Workflow '%s' purged\n", instanceID) + } + + i++ + if i >= 100 { + i = 0 + } + } + } +} + +// waitForWorkflowCompletion polls every 5s to check the workflow status +func waitForWorkflowCompletion(ctx context.Context, client dapr.Client, instanceID string) { + for { + respGet, err := client.GetWorkflowBeta1(ctx, &dapr.GetWorkflowRequest{ + InstanceID: instanceID, + }) + if err != nil { + log.Printf("Error retrieving workflow status for %s: %v\n", instanceID, err) + continue + } + + switch respGet.RuntimeStatus { + case workflow.StatusCompleted.String(): + log.Printf("Workflow '%s' completed\n", instanceID) + return + case workflow.StatusFailed.String(): + log.Printf("Workflow '%s' failed\n", instanceID) + return + } + time.Sleep(5 * time.Second) + } +} + +// waitForShutdown keeps the app alive until an interrupt or termination signal is received +func waitForShutdown(daprService common.Service, worker *workflow.WorkflowWorker, cancelFunc context.CancelFunc) { + sigCh := make(chan os.Signal, 1) + // Notify the channel on Interrupt (Ctrl+C) or SIGTERM (for Docker/K8s graceful shutdown) + signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) + <-sigCh + + log.Println("Shutting down...") + + if err := daprService.GracefulStop(); err != nil { + log.Printf("Failed to gracefully shutdown dapr service: %v", err) + } + + if err := worker.Shutdown(); err != nil { + log.Printf("Failed to shutdown workflow worker: %v", err) + } + + cancelFunc() +}