Skip to content

Commit 2c328d1

Browse files
authored
Add logs to Dapr Workflows (#645)
1 parent 5c10b78 commit 2c328d1

File tree

7 files changed

+132
-10
lines changed

7 files changed

+132
-10
lines changed

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
2828
from dapr.conf import settings
2929
from dapr.conf.helpers import GrpcEndpoint
30+
from dapr.ext.workflow.logger import LoggerOptions, Logger
3031

3132
T = TypeVar('T')
3233
TInput = TypeVar('TInput')
@@ -43,19 +44,29 @@ class DaprWorkflowClient:
4344
application.
4445
"""
4546

46-
def __init__(self, host: Optional[str] = None, port: Optional[str] = None):
47+
def __init__(
48+
self,
49+
host: Optional[str] = None,
50+
port: Optional[str] = None,
51+
logger_options: Optional[LoggerOptions] = None):
4752
address = getAddress(host, port)
4853

4954
try:
5055
uri = GrpcEndpoint(address)
5156
except ValueError as error:
5257
raise DaprInternalError(f'{error}') from error
5358

59+
self._logger = Logger("DaprWorkflowClient", logger_options)
60+
5461
metadata = tuple()
5562
if settings.DAPR_API_TOKEN:
5663
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
57-
self.__obj = client.TaskHubGrpcClient(host_address=uri.endpoint, metadata=metadata,
58-
secure_channel=uri.tls)
64+
options = self._logger.get_options()
65+
self.__obj = client.TaskHubGrpcClient(host_address=uri.endpoint,
66+
metadata=metadata,
67+
secure_channel=uri.tls,
68+
log_handler=options.log_handler,
69+
log_formatter=options.log_formatter)
5970

6071
def schedule_new_workflow(self, workflow: Workflow, *, input: Optional[TInput] = None,
6172
instance_id: Optional[str] = None,

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from dapr.ext.workflow.workflow_context import WorkflowContext, Workflow
2222
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
23+
from dapr.ext.workflow.logger import LoggerOptions, Logger
2324

2425
T = TypeVar('T')
2526
TInput = TypeVar('TInput')
@@ -29,8 +30,12 @@
2930
class DaprWorkflowContext(WorkflowContext):
3031
"""DaprWorkflowContext that provides proxy access to internal OrchestrationContext instance."""
3132

32-
def __init__(self, ctx: task.OrchestrationContext):
33+
def __init__(
34+
self,
35+
ctx: task.OrchestrationContext,
36+
logger_options: Optional[LoggerOptions] = None):
3337
self.__obj = ctx
38+
self._logger = Logger("DaprWorkflowContext", logger_options)
3439

3540
# provide proxy access to regular attributes of wrapped object
3641
def __getattr__(self, name):
@@ -49,10 +54,12 @@ def is_replaying(self) -> bool:
4954
return self.__obj.is_replaying
5055

5156
def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
57+
self._logger.debug(f'{self.instance_id}: Creating timer to fire at {fire_at} time')
5258
return self.__obj.create_timer(fire_at)
5359

5460
def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *,
5561
input: TInput = None) -> task.Task[TOutput]:
62+
self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}')
5663
if hasattr(activity, '_dapr_alternate_name'):
5764
return self.__obj.call_activity(activity=activity.__dict__['_dapr_alternate_name'],
5865
input=input)
@@ -62,8 +69,10 @@ def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TO
6269
def call_child_workflow(self, workflow: Workflow, *,
6370
input: Optional[TInput],
6471
instance_id: Optional[str]) -> task.Task[TOutput]:
72+
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}')
73+
6574
def wf(ctx: task.OrchestrationContext, inp: TInput):
66-
daprWfContext = DaprWorkflowContext(ctx)
75+
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
6776
return workflow(daprWfContext, inp)
6877
# copy workflow name so durabletask.worker can find the orchestrator in its registry
6978

@@ -75,9 +84,11 @@ def wf(ctx: task.OrchestrationContext, inp: TInput):
7584
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)
7685

7786
def wait_for_external_event(self, name: str) -> task.Task:
87+
self._logger.debug(f'{self.instance_id}: Waiting for external event {name}')
7888
return self.__obj.wait_for_external_event(name)
7989

8090
def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
91+
self._logger.debug(f'{self.instance_id}: Continuing as new')
8192
self.__obj.continue_as_new(new_input, save_events=save_events)
8293

8394

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from dapr.ext.workflow.logger.options import LoggerOptions
2+
from dapr.ext.workflow.logger.logger import Logger
3+
4+
__all__ = [
5+
'LoggerOptions',
6+
'Logger'
7+
]
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import logging
2+
from typing import Union
3+
from dapr.ext.workflow.logger.options import LoggerOptions
4+
5+
6+
class Logger:
7+
def __init__(self,
8+
name: str,
9+
options: Union[LoggerOptions, None] = None):
10+
# If options is None, then create a new LoggerOptions object
11+
if options is None:
12+
options = LoggerOptions()
13+
log_handler = options.log_handler
14+
log_handler.setLevel(options.log_level)
15+
log_handler.setFormatter(options.log_formatter)
16+
logger = logging.getLogger(name)
17+
logger.handlers.append(log_handler)
18+
self._logger_options = options
19+
self._logger = logger
20+
21+
def get_options(self) -> LoggerOptions:
22+
return self._logger_options
23+
24+
def debug(self, msg, *args, **kwargs):
25+
self._logger.debug(msg, *args, **kwargs)
26+
27+
def info(self, msg, *args, **kwargs):
28+
self._logger.info(msg, *args, **kwargs)
29+
30+
def warning(self, msg, *args, **kwargs):
31+
self._logger.warning(msg, *args, **kwargs)
32+
33+
def error(self, msg, *args, **kwargs):
34+
self._logger.error(msg, *args, **kwargs)
35+
36+
def critical(self, msg, *args, **kwargs):
37+
self._logger.critical(msg, *args, **kwargs)
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# -*- coding: utf-8 -*-
2+
3+
"""
4+
Copyright 2023 The Dapr Authors
5+
Licensed under the Apache License, Version 2.0 (the "License");
6+
you may not use this file except in compliance with the License.
7+
You may obtain a copy of the License at
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
"""
15+
16+
from typing import Union
17+
import logging
18+
19+
20+
class LoggerOptions:
21+
def __init__(
22+
self,
23+
log_level: Union[str, None] = None,
24+
log_handler: Union[logging.Handler, None] = None,
25+
log_formatter: Union[logging.Formatter, None] = None,
26+
):
27+
# Set default log level to INFO if none is provided
28+
if log_level is None:
29+
log_level = logging.INFO
30+
# Add a default log handler if none is provided
31+
if log_handler is None:
32+
log_handler = logging.StreamHandler()
33+
# Set a default log formatter if none is provided
34+
if log_formatter is None:
35+
log_formatter = logging.Formatter(
36+
fmt="%(asctime)s.%(msecs)03d %(name)s %(levelname)s: %(message)s",
37+
datefmt='%Y-%m-%d %H:%M:%S')
38+
self.log_level = log_level
39+
self.log_handler = log_handler
40+
self.log_formatter = log_formatter

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
2929
from dapr.conf import settings
3030
from dapr.conf.helpers import GrpcEndpoint
31+
from dapr.ext.workflow.logger import LoggerOptions, Logger
3132

3233
T = TypeVar('T')
3334
TInput = TypeVar('TInput')
@@ -38,7 +39,12 @@ class WorkflowRuntime:
3839
"""WorkflowRuntime is the entry point for registering workflows and activities.
3940
"""
4041

41-
def __init__(self, host: Optional[str] = None, port: Optional[str] = None):
42+
def __init__(
43+
self,
44+
host: Optional[str] = None,
45+
port: Optional[str] = None,
46+
logger_options: Optional[LoggerOptions] = None):
47+
self._logger = Logger("WorkflowRuntime", logger_options)
4248
metadata = tuple()
4349
if settings.DAPR_API_TOKEN:
4450
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
@@ -49,13 +55,19 @@ def __init__(self, host: Optional[str] = None, port: Optional[str] = None):
4955
except ValueError as error:
5056
raise DaprInternalError(f'{error}') from error
5157

52-
self.__worker = worker.TaskHubGrpcWorker(host_address=uri.endpoint, metadata=metadata,
53-
secure_channel=uri.tls)
58+
options = self._logger.get_options()
59+
self.__worker = worker.TaskHubGrpcWorker(host_address=uri.endpoint,
60+
metadata=metadata,
61+
secure_channel=uri.tls,
62+
log_handler=options.log_handler,
63+
log_formatter=options.log_formatter)
5464

5565
def register_workflow(self, fn: Workflow, *, name: Optional[str] = None):
66+
self._logger.info(f"Registering workflow '{fn.__name__}' with runtime")
67+
5668
def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
57-
"""Responsible to call Workflow function in orchestrationWrapper."""
58-
daprWfContext = DaprWorkflowContext(ctx)
69+
"""Responsible to call Workflow function in orchestrationWrapper"""
70+
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
5971
if inp is None:
6072
return fn(daprWfContext)
6173
return fn(daprWfContext, inp)
@@ -80,6 +92,8 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None):
8092
"""Registers a workflow activity as a function that takes
8193
a specified input type and returns a specified output type.
8294
"""
95+
self._logger.info(f"Registering activity '{fn.__name__}' with runtime")
96+
8397
def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None):
8498
"""Responsible to call Activity function in activityWrapper"""
8599
wfActivityContext = WorkflowActivityContext(ctx)

ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828

2929

3030
class FakeOrchestrationContext:
31+
def __init__(self):
32+
self.instance_id = mock_instance_id
3133

3234
def create_timer(self, fire_at):
3335
return mock_create_timer

0 commit comments

Comments
 (0)