Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions docs/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,92 @@ Orchestrations can schedule durable timers using the `create_timer` API. These t

Orchestrations can start child orchestrations using the `call_sub_orchestrator` API. Child orchestrations are useful for encapsulating complex logic and for breaking up large orchestrations into smaller, more manageable pieces. Sub-orchestrations can also be versioned in a similar manner to their parent orchestrations, however, they do not inherit the parent orchestrator's version. Instead, they will use the default_version defined in the current worker's VersioningOptions unless otherwise specified during `call_sub_orchestrator`.

### Entities

#### Concepts

Durable Entities provide a way to model small, stateful objects within your orchestration workflows. Each entity has a unique identity and maintains its own state, which is persisted durably. Entities can be interacted with by sending them operations (messages) that mutate or query their state. These operations are processed sequentially, ensuring consistency. Examples of uses for durable entities include counters, accumulators, or any other operation which requires state to persist across orchestrations.

Entities can be invoked from durable clients directly, or from durable orchestrators. They support features like automatic state persistence, concurrency control, and can be locked for exclusive access during critical operations.

Entities are accessed by a unique ID, implemented here as EntityInstanceId. This ID is comprised of two parts, an entity name referring to the function or class that defines the behavior of the entity, and a key which is any string defined in your code. Each entity instance, represented by a distinct EntityInstanceId, has its own state.

#### Syntax

##### Defining Entities

Entities can be defined using either function-based or class-based syntax.

```python
# Funtion-based entity
def counter(ctx: entities.EntityContext, input: int):
state = ctx.get_state(int, 0)
if ctx.operation == "add":
state += input
ctx.set_state(state)
elif operation == "get":
return state

# Class-based entity
class Counter(entities.DurableEntity):
def __init__(self):
self.set_state(0)

def add(self, amount: int):
self.set_state(self.get_state(int, 0) + amount)

def get(self):
return self.get_state(int, 0)
```

> Note that the object properties of class-based entities may not be preserved across invocations. Use the derived get_state and set_state methods to access the persisted entity data.

##### Invoking entities

Entities are invoked using the `signal_entity` or `call_entity` APIs. The Durable Client only allows `signal_entity`:

```python
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
taskhub=taskhub_name, token_credential=None)
entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId")
c.signal_entity(entity_id, "do_nothing")
```

Whereas orchestrators can choose to use `signal_entity` or `call_entity`:

```python
# Signal an entity (fire-and-forget)
entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId")
ctx.signal_entity(entity_id, operation_name="add", input=5)

# Call an entity (wait for result)
entity_id = entities.EntityInstanceId("my_entity_function", "myEntityId")
result = yield ctx.call_entity(entity_id, operation_name="get")
```

##### Entity actions

Entities can perform actions such signaling other entities or starting new orchestrations

- `ctx.signal_entity(entity_id, operation, input)`
- `ctx.schedule_new_orchestration(orchestrator_name, input)`

##### Locking and concurrency

Because entites can be accessed from multiple running orchestrations at the same time, entities may also be locked by a single orchestrator ensuring exclusive access during the duration of the lock (also known as a critical section). Think semaphores:

```python
with (yield ctx.lock_entities([entity_id_1, entity_id_2]):
# Perform entity call operations that require exclusive access
...
```

Note that locked entities may not be signalled, and every call to a locked entity must return a result before another call to the same entity may be made from within the critical section. For more details and advanced usage, see the examples and API documentation.

##### Deleting entities

Entites are represented as orchestration instances in your Task Hub, and their state is persisted in the Task Hub as well. When using the Durable Task Scheduler as your durability provider, the backend will automatically clean up entities when their state is empty, this is effectively the "delete" operation to save space in the Task Hub. In the DTS Dashboard, "delete entity" simply signals the entity with the "delete" operation. In this SDK, we provide a default implementation for the "delete" operation to clear the state when using class-based entities, which end users are free to override as needed. Users must implement "delete" manually for function-based entities.

### External events

Orchestrations can wait for external events using the `wait_for_external_event` API. External events are useful for implementing human interaction patterns, such as waiting for a user to approve an order before continuing.
Expand Down
16 changes: 15 additions & 1 deletion durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import logging
import uuid
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Optional, Sequence, TypeVar, Union

import grpc
from google.protobuf import wrappers_pb2

from durabletask.entities import EntityInstanceId
import durabletask.internal.helpers as helpers
import durabletask.internal.orchestrator_service_pb2 as pb
import durabletask.internal.orchestrator_service_pb2_grpc as stubs
Expand Down Expand Up @@ -227,3 +228,16 @@ def purge_orchestration(self, instance_id: str, recursive: bool = True):
req = pb.PurgeInstancesRequest(instanceId=instance_id, recursive=recursive)
self._logger.info(f"Purging instance '{instance_id}'.")
self._stub.PurgeInstances(req)

def signal_entity(self, entity_instance_id: EntityInstanceId, operation_name: str, input: Optional[Any] = None):
req = pb.SignalEntityRequest(
instanceId=str(entity_instance_id),
name=operation_name,
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input else None,
requestId=str(uuid.uuid4()),
scheduledTime=None,
parentTraceContext=None,
requestTime=helpers.new_timestamp(datetime.now(timezone.utc))
)
self._logger.info(f"Signaling entity '{entity_instance_id}' operation '{operation_name}'.")
self._stub.SignalEntity(req, None) # TODO: Cancellation timeout?
13 changes: 13 additions & 0 deletions durabletask/entities/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Durable Task SDK for Python entities component"""

from durabletask.entities.entity_instance_id import EntityInstanceId
from durabletask.entities.durable_entity import DurableEntity
from durabletask.entities.entity_lock import EntityLock
from durabletask.entities.entity_context import EntityContext

__all__ = ["EntityInstanceId", "DurableEntity", "EntityLock", "EntityContext"]

PACKAGE_NAME = "durabletask.entities"
93 changes: 93 additions & 0 deletions durabletask/entities/durable_entity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import Any, Optional, Type, TypeVar, Union, overload

from durabletask.entities.entity_context import EntityContext
from durabletask.entities.entity_instance_id import EntityInstanceId

TState = TypeVar("TState")


class DurableEntity:
def _initialize_entity_context(self, context: EntityContext):
self.entity_context = context

@overload
def get_state(self, intended_type: Type[TState], default: TState) -> TState:
...

@overload
def get_state(self, intended_type: Type[TState]) -> Optional[TState]:
...

@overload
def get_state(self, intended_type: None = None, default: Any = None) -> Any:
...

def get_state(self, intended_type: Optional[Type[TState]] = None, default: Optional[TState] = None) -> Union[None, TState, Any]:
"""Get the current state of the entity, optionally converting it to a specified type.

Parameters
----------
intended_type : Type[TState] | None, optional
The type to which the state should be converted. If None, the state is returned as-is.
default : TState, optional
The default value to return if the state is not found or cannot be converted.

Returns
-------
TState | Any
The current state of the entity, optionally converted to the specified type.
"""
return self.entity_context.get_state(intended_type, default)

def set_state(self, state: Any):
"""Set the state of the entity to a new value.

Parameters
----------
new_state : Any
The new state to set for the entity.
"""
self.entity_context.set_state(state)

def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, input: Optional[Any] = None) -> None:
"""Signal another entity to perform an operation.

Parameters
----------
entity_instance_id : EntityInstanceId
The ID of the entity instance to signal.
operation : str
The operation to perform on the entity.
input : Any, optional
The input to provide to the entity for the operation.
"""
self.entity_context.signal_entity(entity_instance_id, operation, input)

def schedule_new_orchestration(self, orchestration_name: str, input: Optional[Any] = None, instance_id: Optional[str] = None) -> str:
"""Schedule a new orchestration instance.

Parameters
----------
orchestration_name : str
The name of the orchestration to schedule.
input : Any, optional
The input to provide to the new orchestration.
instance_id : str, optional
The instance ID to assign to the new orchestration. If None, a new ID will be generated.

Returns
-------
str
The instance ID of the scheduled orchestration.
"""
return self.entity_context.schedule_new_orchestration(orchestration_name, input, instance_id=instance_id)

def delete(self, input: Any = None) -> None:
"""Delete the entity instance.

Parameters
----------
input : Any, optional
Unused: The input for the entity "delete" operation.
"""
self.set_state(None)
154 changes: 154 additions & 0 deletions durabletask/entities/entity_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@

from typing import Any, Optional, Type, TypeVar, Union, overload
import uuid
from durabletask.entities.entity_instance_id import EntityInstanceId
from durabletask.internal import helpers, shared
from durabletask.internal.entity_state_shim import StateShim
import durabletask.internal.orchestrator_service_pb2 as pb

TState = TypeVar("TState")


class EntityContext:
def __init__(self, orchestration_id: str, operation: str, state: StateShim, entity_id: EntityInstanceId):
self._orchestration_id = orchestration_id
self._operation = operation
self._state = state
self._entity_id = entity_id

@property
def orchestration_id(self) -> str:
"""Get the ID of the orchestration instance that scheduled this entity.

Returns
-------
str
The ID of the current orchestration instance.
"""
return self._orchestration_id

@property
def operation(self) -> str:
"""Get the operation associated with this entity invocation.

The operation is a string that identifies the specific action being
performed on the entity. It can be used to distinguish between
multiple operations that are part of the same entity invocation.

Returns
-------
str
The operation associated with this entity invocation.
"""
return self._operation

@overload
def get_state(self, intended_type: Type[TState], default: TState) -> TState:
...

@overload
def get_state(self, intended_type: Type[TState]) -> Optional[TState]:
...

@overload
def get_state(self, intended_type: None = None, default: Any = None) -> Any:
...

def get_state(self, intended_type: Optional[Type[TState]] = None, default: Optional[TState] = None) -> Union[None, TState, Any]:
"""Get the current state of the entity, optionally converting it to a specified type.

Parameters
----------
intended_type : Type[TState] | None, optional
The type to which the state should be converted. If None, the state is returned as-is.
default : TState, optional
The default value to return if the state is not found or cannot be converted.

Returns
-------
TState | Any
The current state of the entity, optionally converted to the specified type.
"""
return self._state.get_state(intended_type, default)

def set_state(self, new_state: Any):
"""Set the state of the entity to a new value.

Parameters
----------
new_state : Any
The new state to set for the entity.
"""
self._state.set_state(new_state)

def signal_entity(self, entity_instance_id: EntityInstanceId, operation: str, input: Optional[Any] = None) -> None:
"""Signal another entity to perform an operation.

Parameters
----------
entity_instance_id : EntityInstanceId
The ID of the entity instance to signal.
operation : str
The operation to perform on the entity.
input : Any, optional
The input to provide to the entity for the operation.
"""
encoded_input = shared.to_json(input) if input is not None else None
self._state.add_operation_action(
pb.OperationAction(
sendSignal=pb.SendSignalAction(
instanceId=str(entity_instance_id),
name=operation,
input=helpers.get_string_value(encoded_input),
scheduledTime=None,
requestTime=None,
parentTraceContext=None,
)
)
)

def schedule_new_orchestration(self, orchestration_name: str, input: Optional[Any] = None, instance_id: Optional[str] = None) -> str:
"""Schedule a new orchestration instance.

Parameters
----------
orchestration_name : str
The name of the orchestration to schedule.
input : Any, optional
The input to provide to the new orchestration.
instance_id : str, optional
The instance ID to assign to the new orchestration. If None, a new ID will be generated.

Returns
-------
str
The instance ID of the scheduled orchestration.
"""
encoded_input = shared.to_json(input) if input is not None else None
if not instance_id:
instance_id = uuid.uuid4().hex
self._state.add_operation_action(
pb.OperationAction(
startNewOrchestration=pb.StartNewOrchestrationAction(
instanceId=instance_id,
name=orchestration_name,
input=helpers.get_string_value(encoded_input),
version=None,
scheduledTime=None,
requestTime=None,
parentTraceContext=None
)
)
)
return instance_id

@property
def entity_id(self) -> EntityInstanceId:
"""Get the ID of the entity instance.

Returns
-------
str
The ID of the current entity instance.
"""
return self._entity_id
Loading
Loading