|
| 1 | +"""End-to-end sample that demonstrates how to configure an orchestrator |
| 2 | +that a dynamic number activity functions in parallel, waits for them all |
| 3 | +to complete, and prints an aggregate summary of the outputs.""" |
| 4 | +import os |
| 5 | + |
| 6 | +from azure.identity import DefaultAzureCredential |
| 7 | + |
| 8 | +from durabletask import client, task, worker |
| 9 | +from durabletask.azuremanaged.client import DurableTaskSchedulerClient |
| 10 | +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker |
| 11 | + |
| 12 | + |
| 13 | +def activity_v1(ctx: task.ActivityContext, input: str) -> str: |
| 14 | + """Activity function that returns a result for a given work item""" |
| 15 | + print("processing input:", input) |
| 16 | + return "Success from activity v1" |
| 17 | + |
| 18 | + |
| 19 | +def activity_v2(ctx: task.ActivityContext, input: str) -> str: |
| 20 | + """Activity function that returns a result for a given work item""" |
| 21 | + print("processing input:", input) |
| 22 | + return "Success from activity v2" |
| 23 | + |
| 24 | + |
| 25 | +def orchestrator(ctx: task.OrchestrationContext, _): |
| 26 | + """Orchestrator function that checks the orchestration version and has version-aware behavior |
| 27 | + Use case: Updating an orchestrator with new logic while maintaining compatibility with previously |
| 28 | + started orchestrations""" |
| 29 | + if ctx.version == "1.0.0": |
| 30 | + # For version 1.0.0, we use the original logic |
| 31 | + result: int = yield ctx.call_activity(activity_v1, input="input for v1") |
| 32 | + elif ctx.version == "2.0.0": |
| 33 | + # For version 2.0.0, we use the updated logic |
| 34 | + result: int = yield ctx.call_activity(activity_v2, input="input for v2") |
| 35 | + else: |
| 36 | + raise ValueError(f"Unsupported version: {ctx.version}") |
| 37 | + return { |
| 38 | + 'result': result, |
| 39 | + } |
| 40 | + |
| 41 | + |
| 42 | +# Use environment variables if provided, otherwise use default emulator values |
| 43 | +taskhub_name = os.getenv("TASKHUB", "default") |
| 44 | +endpoint = os.getenv("ENDPOINT", "http://localhost:8080") |
| 45 | + |
| 46 | +print(f"Using taskhub: {taskhub_name}") |
| 47 | +print(f"Using endpoint: {endpoint}") |
| 48 | + |
| 49 | +# Set credential to None for emulator, or DefaultAzureCredential for Azure |
| 50 | +credential = None if endpoint == "http://localhost:8080" else DefaultAzureCredential() |
| 51 | + |
| 52 | +# configure and start the worker - use secure_channel=False for emulator |
| 53 | +secure_channel = endpoint != "http://localhost:8080" |
| 54 | +with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=secure_channel, |
| 55 | + taskhub=taskhub_name, token_credential=credential) as w: |
| 56 | + # This worker is versioned for v2, as the orchestrator code has already been updated |
| 57 | + # CURRENT_OR_OLDER allows this worker to process orchestrations versioned below 2.0.0 - e.g. 1.0.0 |
| 58 | + w.use_versioning(worker.VersioningOptions( |
| 59 | + version="2.0.0", |
| 60 | + default_version="2.0.0", |
| 61 | + match_strategy=worker.VersionMatchStrategy.CURRENT_OR_OLDER, |
| 62 | + failure_strategy=worker.VersionFailureStrategy.FAIL |
| 63 | + )) |
| 64 | + w.add_orchestrator(orchestrator) |
| 65 | + w.add_activity(activity_v1) |
| 66 | + w.add_activity(activity_v2) |
| 67 | + w.start() |
| 68 | + |
| 69 | + # create a client, start an orchestration, and wait for it to finish |
| 70 | + # The client's version matches the worker's version |
| 71 | + c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=secure_channel, |
| 72 | + taskhub=taskhub_name, token_credential=credential, |
| 73 | + default_version="2.0.0") |
| 74 | + # Schedule a new orchestration manually versioned to 1.0.0 |
| 75 | + # Normally, this would have been scheduled before the worker started from a worker also versioned v1.0.0, |
| 76 | + # Here we are doing it manually to avoid creating two workers |
| 77 | + instance_id_v1 = c.schedule_new_orchestration(orchestrator, version="1.0.0") |
| 78 | + state_v1 = c.wait_for_orchestration_completion(instance_id_v1, timeout=30) |
| 79 | + if state_v1 and state_v1.runtime_status == client.OrchestrationStatus.COMPLETED: |
| 80 | + print(f'Orchestration v1 completed! Result: {state_v1.serialized_output}') |
| 81 | + elif state_v1: |
| 82 | + print(f'Orchestration v1 failed: {state_v1.failure_details}') |
| 83 | + |
| 84 | + # Also check that the orchestrator can be run with the current version |
| 85 | + instance_id = c.schedule_new_orchestration(orchestrator) |
| 86 | + state = c.wait_for_orchestration_completion(instance_id, timeout=30) |
| 87 | + if state and state.runtime_status == client.OrchestrationStatus.COMPLETED: |
| 88 | + print(f'Orchestration completed! Result: {state.serialized_output}') |
| 89 | + elif state: |
| 90 | + print(f'Orchestration failed: {state.failure_details}') |
| 91 | + |
| 92 | + exit() |
0 commit comments