diff --git a/codeblocks/codeblocks.json b/codeblocks/codeblocks.json index b9c16aad..ae49c999 100644 --- a/codeblocks/codeblocks.json +++ b/codeblocks/codeblocks.json @@ -1 +1 @@ -{"https://github.com/orkes-io/workflow-cicd/blob/main/src/deploy_workflows.sh---1":"{`export response=\\`curl -s -X POST $CONDUCTOR_SERVER_URL/token -H 'Content-Type:application/json' -d '{\n\t\"keyId\": \"'\"$KEY\"'\",\n\t\"keySecret\": \"'\"$SECRET\"'\"\n}'\\`\n\nif [[ \"$response\" != *'token'* ]]; then\n echo \"Unable to generate the auth header. Please check KEY, SECRET and CONDUCTOR_SERVER_URL variables\"\n echo \"Server response:\"\n echo $response\n exit 1\nfi\n\nexport token=\\`echo $response | cut -d '\"' -f4\\`\n\nfor FILE in main/resources/workflows/*;\n do\n echo \"Deploying @$FILE\";\n\n curl -X POST $CONDUCTOR_SERVER_URL/metadata/workflow?overwrite=true \\\n -H \"X-Authorization: $token\" \\\n -H \"accept: */*\" \\\n -H \"Content-Type: application/json\" \\\n -d @$FILE\n done\n`}","https://github.com/orkes-io/workflow-cicd/blob/main/src/deploy_workflows.sh---1-lines":"#L8-L32","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---1":"{`\n /**\n * Note: Using this setting, up to 5 tasks will run in parallel, with tasks being polled every 200ms\n */\n @WorkerTask(value = \"fraud-check\", threadCount = 5, pollingInterval = 200)\n public FraudCheckResult checkForFraudTask(DepositDetail depositDetail) {\n return fraudCheckService.checkForFraud(depositDetail);\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---1-lines":"#L27-L35","https://github.com/conductor-sdk/python-sdk-examples/blob/upgrade-sdk-version-1.0.66/examples/worker/workers.py---1":"{`@WorkerTask(task_definition_name='fraud_check', poll_interval_seconds=0.5)\ndef get_user_info(task: Task) -> UserInfo:\n userId = task.input_data['userId']\n return UserInfo(name='User X', id=userId)`}","https://github.com/conductor-sdk/python-sdk-examples/blob/upgrade-sdk-version-1.0.66/examples/worker/workers.py---1-lines":"#L10-L13","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Worker/Workers.cs---1":"{` // Note: Using this setting, up to 5 tasks will run in parallel, with tasks being polled every 200ms\n [WorkerTask(taskType: \"fraud-check\", batchSize: 5, domain: null, pollIntervalMs: 200, workerId: \"workerId\")]\n public TaskResult FraudWorker(Task task)\n {\n var depositDetail = (DepositDetail)task.InputData[\"depositDetail\"];\n var fraudCheckResult = _fraudCheckService.CheckForFraud(depositDetail);\n var result = task.Completed();\n result.OutputData = Examples.Util.TypeUtil.GetDictionaryFromObject(fraudCheckResult);\n return result;\n }`}","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Worker/Workers.cs---1-lines":"#L25-L34","https://github.com/conductor-sdk/javascript-sdk-examples/blob/main/src/banking/workers/workers.js---1":"{`const fraudCheckWorker = {\n taskDefName: \"fraud-check\",\n execute: async ({ inputData }) => {\n const { amount, accountId } = inputData;\n const fraudResult = fraudService.isFraudulentTxn(accountId, amount);\n return {\n outputData: fraudResult,\n status: \"COMPLETED\",\n };\n },\n domain: \"fraud\", // Optional\n pollInterval: 100, // Optional can be specified in the TaskManager\n concurrency: 1, // Optional can be specified in the TaskManager\n};`}","https://github.com/conductor-sdk/javascript-sdk-examples/blob/main/src/banking/workers/workers.js---1-lines":"#L4-L17","https://github.com/conductor-sdk/typescript-examples/blob/main/src/banking/workers/workers.ts---1":"{`export const fraudCheckWorker: ConductorWorker = {\n taskDefName: \"fraud-check\",\n execute: async ({ inputData }) => {\n const amount = inputData?.amount;\n const accountId = inputData?.accountId;\n const fraudResult = fraudService.isFraudulentTxn(accountId, amount);\n return {\n outputData: fraudResult,\n status: \"COMPLETED\",\n };\n },\n domain: \"fraud\", // Optional\n pollInterval: 100, // Optional can be specified in the TaskManager\n concurrency: 2, // Optional can be specified in the TaskManager\n};`}","https://github.com/conductor-sdk/typescript-examples/blob/main/src/banking/workers/workers.ts---1-lines":"#L5-L19","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---2":"{`\n @WorkerTask(value = \"retrieve-deposit-batch\", threadCount = 5, pollingInterval = 200)\n public List retrieveDepositBatch(@InputParam(\"batchCount\") Integer batchCount) {\n if (batchCount == null) {\n batchCount = random.nextInt(5, 11);\n }\n batchCount = Math.min(100, batchCount); // Limit to 100 in playground\n List depositDetails = IntStream.range(0, batchCount)\n .mapToObj(i -> DepositDetail.builder()\n .accountId(\"acc-id-\" + i)\n .amount(BigDecimal.valueOf(i * 1500L)) // Create random amounts\n .build())\n .toList();\n log.info(\"Returning {} transactions\", depositDetails.size());\n return depositDetails;\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---2-lines":"#L40-L56","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/service/WorkflowService.java---1":"{`\n StartWorkflowRequest request = new StartWorkflowRequest();\n request.setName(\"deposit_payment\");\n Map inputData = new HashMap<>();\n inputData.put(\"amount\", depositDetail.getAmount());\n inputData.put(\"accountId\", depositDetail.getAccountId());\n request.setInput(inputData);\n\n String workflowId = workflowClient.startWorkflow(request);\n log.info(\"Workflow id: {}\", workflowId);\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/service/WorkflowService.java---1-lines":"#L22-L32","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Service/WorkflowService.cs---1":"{` var request = new StartWorkflowRequest\n {\n Name = WORKFLOW_NAME,\n Version = WORKFLOW_VERSION,\n Input = Examples.Util.TypeUtil.GetDictionaryFromObject(depositDetail)\n };\n var workflowId = _workflowClient.StartWorkflow(request);\n Console.WriteLine($\"Started deposit workflow id: {workflowId}\");\n return workflowId;`}","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Service/WorkflowService.cs---1-lines":"#L23-L31","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/controller/BankingApiController.java---1":"{` @PostMapping(value = \"/triggerDepositFlow\", produces = \"application/json\")\n public ResponseEntity> triggerDepositFlow(@RequestBody DepositDetail depositDetail) {\n log.info(\"Starting deposit flow for: {}\", depositDetail);\n return ResponseEntity.ok(workflowService.startDepositWorkflow(depositDetail));\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/controller/BankingApiController.java---1-lines":"#L32-L37","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/PollUntilConditionMeetsWorker.java---1":"{` @Override\n public TaskResult execute(Task task) {\n TaskResult taskResult = new TaskResult(task);\n if (!task.getInputData().containsKey(POLL_COUNTER)) {\n taskResult.addOutputData(\"message\", \"pollCounter param not found in input, will use default of \" + defaultPollCount + \" polls\");\n }\n\n int pollCounter = Math.min(10, castToInt(task.getInputData().getOrDefault(POLL_COUNTER, defaultPollCount)));\n int pollIntervalSeconds = Math.min(10, castToInt(task.getInputData().getOrDefault(POLL_INTERVAL_SECONDS, 5)));\n\n // Add these to the output for context\n taskResult.addOutputData(POLL_INTERVAL_SECONDS, pollIntervalSeconds + \" (this test task has a max limit of 10 seconds)\");\n taskResult.addOutputData(POLL_COUNTER, pollCounter + \" (this test task has a max limit of 10 iterations)\");\n\n // We can read current iteration from the task output as the data will be retained on the worker when polled\n int currentIteration = castToInt(taskResult.getOutputData().getOrDefault(CURRENT_ITERATION, 0));\n\n // Increment the current iteration and set to the task output\n taskResult.addOutputData(CURRENT_ITERATION, ++currentIteration);\n taskResult.addOutputData(\"updatedTime\", new Date().toString());\n\n // While condition is not met, keep task in progress\n if (currentIteration < pollCounter) {\n taskResult.setStatus(TaskResult.Status.IN_PROGRESS);\n // Set to configured seconds to callback, and you can set this to any value as per the requirements\n taskResult.setCallbackAfterSeconds(pollIntervalSeconds);\n return taskResult;\n }\n\n // Set task as completed now that the poll count condition is met\n taskResult.setStatus(TaskResult.Status.COMPLETED);\n return taskResult;\n }`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/PollUntilConditionMeetsWorker.java---1-lines":"#L24-L56"} \ No newline at end of file +{"https://github.com/orkes-io/workflow-cicd/blob/main/src/deploy_workflows.sh---1":"{`export response=\\`curl -s -X POST $CONDUCTOR_SERVER_URL/token -H 'Content-Type:application/json' -d '{\n\t\"keyId\": \"'\"$KEY\"'\",\n\t\"keySecret\": \"'\"$SECRET\"'\"\n}'\\`\n\nif [[ \"$response\" != *'token'* ]]; then\n echo \"Unable to generate the auth header. Please check KEY, SECRET and CONDUCTOR_SERVER_URL variables\"\n echo \"Server response:\"\n echo $response\n exit 1\nfi\n\nexport token=\\`echo $response | cut -d '\"' -f4\\`\n\nfor FILE in main/resources/workflows/*;\n do\n echo \"Deploying @$FILE\";\n\n curl -X POST $CONDUCTOR_SERVER_URL/metadata/workflow?overwrite=true \\\n -H \"X-Authorization: $token\" \\\n -H \"accept: */*\" \\\n -H \"Content-Type: application/json\" \\\n -d @$FILE\n done\n`}","https://github.com/orkes-io/workflow-cicd/blob/main/src/deploy_workflows.sh---1-lines":"#L8-L32","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---1":"{`\n /**\n * Note: Using this setting, up to 5 tasks will run in parallel, with tasks being polled every 200ms\n */\n @WorkerTask(value = \"fraud-check\", threadCount = 5, pollingInterval = 200)\n public FraudCheckResult checkForFraudTask(DepositDetail depositDetail) {\n return fraudCheckService.checkForFraud(depositDetail);\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---1-lines":"#L27-L35","https://github.com/conductor-sdk/python-sdk-examples/blob/upgrade-sdk-version-1.0.66/examples/worker/workers.py---1":"{`@WorkerTask(task_definition_name='fraud_check', poll_interval_seconds=0.5)\ndef get_user_info(task: Task) -> UserInfo:\n userId = task.input_data['userId']\n return UserInfo(name='User X', id=userId)`}","https://github.com/conductor-sdk/python-sdk-examples/blob/upgrade-sdk-version-1.0.66/examples/worker/workers.py---1-lines":"#L10-L13","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Worker/Workers.cs---1":"{` // Note: Using this setting, up to 5 tasks will run in parallel, with tasks being polled every 200ms\n [WorkerTask(taskType: \"fraud-check\", batchSize: 5, domain: null, pollIntervalMs: 200, workerId: \"workerId\")]\n public TaskResult FraudWorker(Task task)\n {\n var depositDetail = (DepositDetail)task.InputData[\"depositDetail\"];\n var fraudCheckResult = _fraudCheckService.CheckForFraud(depositDetail);\n var result = task.Completed();\n result.OutputData = Examples.Util.TypeUtil.GetDictionaryFromObject(fraudCheckResult);\n return result;\n }`}","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Worker/Workers.cs---1-lines":"#L25-L34","https://github.com/conductor-sdk/javascript-sdk-examples/blob/main/src/banking/workers/workers.js---1":"{`const fraudCheckWorker = {\n taskDefName: \"fraud-check\",\n execute: async ({ inputData }) => {\n const { amount, accountId } = inputData;\n const fraudResult = fraudService.isFraudulentTxn(accountId, amount);\n return {\n outputData: fraudResult,\n status: \"COMPLETED\",\n };\n },\n};`}","https://github.com/conductor-sdk/javascript-sdk-examples/blob/main/src/banking/workers/workers.js---1-lines":"#L4-L14","https://github.com/conductor-sdk/typescript-examples/blob/main/src/banking/workers/workers.ts---1":"{`export const fraudCheckWorker: ConductorWorker = {\n taskDefName: \"fraud-check\",\n execute: async ({ inputData }) => {\n const amount = inputData?.amount;\n const accountId = inputData?.accountId;\n const fraudResult = fraudService.isFraudulentTxn(accountId, amount);\n return {\n outputData: fraudResult,\n status: \"COMPLETED\",\n };\n },\n};`}","https://github.com/conductor-sdk/typescript-examples/blob/main/src/banking/workers/workers.ts---1-lines":"#L5-L16","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---2":"{`\n @WorkerTask(value = \"retrieve-deposit-batch\", threadCount = 5, pollingInterval = 200)\n public List retrieveDepositBatch(@InputParam(\"batchCount\") Integer batchCount) {\n if (batchCount == null) {\n batchCount = random.nextInt(5, 11);\n }\n batchCount = Math.min(100, batchCount); // Limit to 100 in playground\n List depositDetails = IntStream.range(0, batchCount)\n .mapToObj(i -> DepositDetail.builder()\n .accountId(\"acc-id-\" + i)\n .amount(BigDecimal.valueOf(i * 1500L)) // Create random amounts\n .build())\n .toList();\n log.info(\"Returning {} transactions\", depositDetails.size());\n return depositDetails;\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/ConductorWorkers.java---2-lines":"#L40-L56","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/service/WorkflowService.java---1":"{`\n StartWorkflowRequest request = new StartWorkflowRequest();\n request.setName(\"deposit_payment\");\n Map inputData = new HashMap<>();\n inputData.put(\"amount\", depositDetail.getAmount());\n inputData.put(\"accountId\", depositDetail.getAccountId());\n request.setInput(inputData);\n\n String workflowId = workflowClient.startWorkflow(request);\n log.info(\"Workflow id: {}\", workflowId);\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/service/WorkflowService.java---1-lines":"#L22-L32","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Service/WorkflowService.cs---1":"{` var request = new StartWorkflowRequest\n {\n Name = WORKFLOW_NAME,\n Version = WORKFLOW_VERSION,\n Input = Examples.Util.TypeUtil.GetDictionaryFromObject(depositDetail)\n };\n var workflowId = _workflowClient.StartWorkflow(request);\n Console.WriteLine($\"Started deposit workflow id: {workflowId}\");\n return workflowId;`}","https://github.com/conductor-sdk/csharp-sdk-examples/blob/main/Examples/Service/WorkflowService.cs---1-lines":"#L23-L31","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/controller/BankingApiController.java---1":"{` @PostMapping(value = \"/triggerDepositFlow\", produces = \"application/json\")\n public ResponseEntity> triggerDepositFlow(@RequestBody DepositDetail depositDetail) {\n log.info(\"Starting deposit flow for: {}\", depositDetail);\n return ResponseEntity.ok(workflowService.startDepositWorkflow(depositDetail));\n }\n`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/controller/BankingApiController.java---1-lines":"#L32-L37","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/PollUntilConditionMeetsWorker.java---1":"{` @Override\n public TaskResult execute(Task task) {\n TaskResult taskResult = new TaskResult(task);\n if (!task.getInputData().containsKey(POLL_COUNTER)) {\n taskResult.addOutputData(\"message\", \"pollCounter param not found in input, will use default of \" + defaultPollCount + \" polls\");\n }\n\n int pollCounter = Math.min(10, castToInt(task.getInputData().getOrDefault(POLL_COUNTER, defaultPollCount)));\n int pollIntervalSeconds = Math.min(10, castToInt(task.getInputData().getOrDefault(POLL_INTERVAL_SECONDS, 5)));\n\n // Add these to the output for context\n taskResult.addOutputData(POLL_INTERVAL_SECONDS, pollIntervalSeconds + \" (this test task has a max limit of 10 seconds)\");\n taskResult.addOutputData(POLL_COUNTER, pollCounter + \" (this test task has a max limit of 10 iterations)\");\n\n // We can read current iteration from the task output as the data will be retained on the worker when polled\n int currentIteration = castToInt(taskResult.getOutputData().getOrDefault(CURRENT_ITERATION, 0));\n\n // Increment the current iteration and set to the task output\n taskResult.addOutputData(CURRENT_ITERATION, ++currentIteration);\n taskResult.addOutputData(\"updatedTime\", new Date().toString());\n\n // While condition is not met, keep task in progress\n if (currentIteration < pollCounter) {\n taskResult.setStatus(TaskResult.Status.IN_PROGRESS);\n // Set to configured seconds to callback, and you can set this to any value as per the requirements\n taskResult.setCallbackAfterSeconds(pollIntervalSeconds);\n return taskResult;\n }\n\n // Set task as completed now that the poll count condition is met\n taskResult.setStatus(TaskResult.Status.COMPLETED);\n return taskResult;\n }`}","https://github.com/conductor-sdk/orkes-java-springboot2-example/blob/main/src/main/java/io/orkes/example/banking/workers/PollUntilConditionMeetsWorker.java---1-lines":"#L24-L56"} \ No newline at end of file diff --git a/docs/reference-docs/operators/fork-join.md b/docs/reference-docs/operators/fork-join.md index 9d5d31a8..2f629f33 100644 --- a/docs/reference-docs/operators/fork-join.md +++ b/docs/reference-docs/operators/fork-join.md @@ -6,7 +6,12 @@ import Tabs from '@theme/Tabs'; import TabItem from '@theme/TabItem'; # Fork/Join -A Fork operation lets you run a specified list of tasks or sub-workflows in parallel. A fork task is followed by a join operation that waits on the forked tasks or sub-workflows to finish. The **JOIN** task also collects outputs from each of the forked tasks or sub workflows. + +A Fork/Join task can be used when you need to run tasks in parallel. It contains two components, the **fork**, and the **join** part. A fork operation lets you run a specified list of tasks in parallel. A fork task is followed by a join operation that waits on the forked tasks to finish. The JOIN task also collects outputs from each of the forked tasks. + +:::note +You can also add [Sub Workflows](https://orkes.io/content/reference-docs/operators/sub-workflow) as forks. +::: ## Definitions @@ -29,8 +34,8 @@ A Fork operation lets you run a specified list of tasks or sub-workflows in para ], } ``` -* A **FORK_JOIN** task has a **forkTasks** attribute that expects an array. Each array is a sub-list of tasks. Each of these sub-lists is then invoked in parallel. The tasks defined within each sublist can be sequential or any other way as desired. -* A FORK_JOIN task has to be followed by a JOIN operation. The **JOIN** operator specifies which of the forked tasks to **joinOn** (waits for completion) before moving to the next stage in the workflow. +* A Fork-Join task has an attribute called **forkTasks**, an array containing the task list. Each of these tasks is invoked in parallel. The tasks defined within each sublist can be sequential or any other way as desired.. +* The forks are followed by a Join task, which specifies which forks should be joined before moving to the next stage in the workflow. ### Input Parameters @@ -144,7 +149,7 @@ Imagine a workflow that sends three notifications: email, SMS, and HTTP. Since n The diagram will appear as follows:

Fork Join Example

-Here's the JSON definition for the workflow: +Here each of the forks (email/SMS/HTTP) runs in parallel, meaning that they are run independently. Here's the JSON definition for the workflow: ```json [ @@ -202,10 +207,11 @@ Here's the JSON definition for the workflow: } ] ``` -:::note -There are three parallel 'tines' to this fork, but only two outputs are required for the JOIN to continue. The diagram does draw an arrow from **http_notification_ref** to the **notification_join**, but it is not required for the workflow to continue. -::: -Here is what the output of notification_join will look like. The output is a map, where the keys are the names of task references being joined. The corresponding values are the outputs of those tasks. +In this example, although we have 3 forks running in parallel, we require only 2 outputs to continue with the workflow. The parameter **joinOn** is defined so that only email and SMS tasks are to be joined, omitting HTTP tasks as optional for completion. + +This workflow is completed when the email and SMS notifications are sent and does not depend on the HTTP notification status. + +Here is what the output of **notification_join** will look like. The output is a map, where the keys are the names of task references being joined. The corresponding values are the outputs of those tasks. ```json