diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index 73eadc6f1..75ef2590d 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -27,6 +27,7 @@ from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint +from dapr.ext.workflow.logger import LoggerOptions, Logger T = TypeVar('T') TInput = TypeVar('TInput') @@ -43,7 +44,11 @@ class DaprWorkflowClient: application. """ - def __init__(self, host: Optional[str] = None, port: Optional[str] = None): + def __init__( + self, + host: Optional[str] = None, + port: Optional[str] = None, + logger_options: Optional[LoggerOptions] = None): address = getAddress(host, port) try: @@ -51,12 +56,23 @@ def __init__(self, host: Optional[str] = None, port: Optional[str] = None): except ValueError as error: raise DaprInternalError(f'{error}') from error + self._logger = Logger("DaprWorkflowClient", logger_options) + metadata = tuple() if settings.DAPR_API_TOKEN: metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),) +<<<<<<< HEAD self.__obj = client.TaskHubGrpcClient( host_address=uri.endpoint, metadata=metadata, secure_channel=uri.tls ) +======= + options = self._logger.get_options() + self.__obj = client.TaskHubGrpcClient(host_address=uri.endpoint, + metadata=metadata, + secure_channel=uri.tls, + log_handler=options.log_handler, + log_formatter=options.log_formatter) +>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645)) def schedule_new_workflow( self, diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index a868c2e15..4f6871068 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -20,6 +20,7 @@ from dapr.ext.workflow.workflow_context import WorkflowContext, Workflow from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext +from dapr.ext.workflow.logger import LoggerOptions, Logger T = TypeVar('T') TInput = TypeVar('TInput') @@ -29,8 +30,12 @@ class DaprWorkflowContext(WorkflowContext): """DaprWorkflowContext that provides proxy access to internal OrchestrationContext instance.""" - def __init__(self, ctx: task.OrchestrationContext): + def __init__( + self, + ctx: task.OrchestrationContext, + logger_options: Optional[LoggerOptions] = None): self.__obj = ctx + self._logger = Logger("DaprWorkflowContext", logger_options) # provide proxy access to regular attributes of wrapped object def __getattr__(self, name): @@ -49,14 +54,21 @@ def is_replaying(self) -> bool: return self.__obj.is_replaying def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: + self._logger.debug(f'{self.instance_id}: Creating timer to fire at {fire_at} time') return self.__obj.create_timer(fire_at) +<<<<<<< HEAD def call_activity( self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *, input: TInput = None, ) -> task.Task[TOutput]: +======= + def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *, + input: TInput = None) -> task.Task[TOutput]: + self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}') +>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645)) if hasattr(activity, '_dapr_alternate_name'): return self.__obj.call_activity( activity=activity.__dict__['_dapr_alternate_name'], input=input @@ -64,11 +76,19 @@ def call_activity( # this return should ideally never execute return self.__obj.call_activity(activity=activity.__name__, input=input) +<<<<<<< HEAD def call_child_workflow( self, workflow: Workflow, *, input: Optional[TInput], instance_id: Optional[str] ) -> task.Task[TOutput]: +======= + def call_child_workflow(self, workflow: Workflow, *, + input: Optional[TInput], + instance_id: Optional[str]) -> task.Task[TOutput]: + self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}') + +>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645)) def wf(ctx: task.OrchestrationContext, inp: TInput): - daprWfContext = DaprWorkflowContext(ctx) + daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) return workflow(daprWfContext, inp) # copy workflow name so durabletask.worker can find the orchestrator in its registry @@ -81,9 +101,11 @@ def wf(ctx: task.OrchestrationContext, inp: TInput): return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id) def wait_for_external_event(self, name: str) -> task.Task: + self._logger.debug(f'{self.instance_id}: Waiting for external event {name}') return self.__obj.wait_for_external_event(name) def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None: + self._logger.debug(f'{self.instance_id}: Continuing as new') self.__obj.continue_as_new(new_input, save_events=save_events) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/logger/__init__.py b/ext/dapr-ext-workflow/dapr/ext/workflow/logger/__init__.py new file mode 100644 index 000000000..42284dffa --- /dev/null +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/logger/__init__.py @@ -0,0 +1,7 @@ +from dapr.ext.workflow.logger.options import LoggerOptions +from dapr.ext.workflow.logger.logger import Logger + +__all__ = [ + 'LoggerOptions', + 'Logger' +] diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py b/ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py new file mode 100644 index 000000000..ef320bda2 --- /dev/null +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py @@ -0,0 +1,37 @@ +import logging +from typing import Union +from dapr.ext.workflow.logger.options import LoggerOptions + + +class Logger: + def __init__(self, + name: str, + options: Union[LoggerOptions, None] = None): + # If options is None, then create a new LoggerOptions object + if options is None: + options = LoggerOptions() + log_handler = options.log_handler + log_handler.setLevel(options.log_level) + log_handler.setFormatter(options.log_formatter) + logger = logging.getLogger(name) + logger.handlers.append(log_handler) + self._logger_options = options + self._logger = logger + + def get_options(self) -> LoggerOptions: + return self._logger_options + + def debug(self, msg, *args, **kwargs): + self._logger.debug(msg, *args, **kwargs) + + def info(self, msg, *args, **kwargs): + self._logger.info(msg, *args, **kwargs) + + def warning(self, msg, *args, **kwargs): + self._logger.warning(msg, *args, **kwargs) + + def error(self, msg, *args, **kwargs): + self._logger.error(msg, *args, **kwargs) + + def critical(self, msg, *args, **kwargs): + self._logger.critical(msg, *args, **kwargs) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/logger/options.py b/ext/dapr-ext-workflow/dapr/ext/workflow/logger/options.py new file mode 100644 index 000000000..46b499c10 --- /dev/null +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/logger/options.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2023 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from typing import Union +import logging + + +class LoggerOptions: + def __init__( + self, + log_level: Union[str, None] = None, + log_handler: Union[logging.Handler, None] = None, + log_formatter: Union[logging.Formatter, None] = None, + ): + # Set default log level to INFO if none is provided + if log_level is None: + log_level = logging.INFO + # Add a default log handler if none is provided + if log_handler is None: + log_handler = logging.StreamHandler() + # Set a default log formatter if none is provided + if log_formatter is None: + log_formatter = logging.Formatter( + fmt="%(asctime)s.%(msecs)03d %(name)s %(levelname)s: %(message)s", + datefmt='%Y-%m-%d %H:%M:%S') + self.log_level = log_level + self.log_handler = log_handler + self.log_formatter = log_formatter diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py index 4653ee265..762c772bf 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py @@ -28,6 +28,7 @@ from dapr.clients.http.client import DAPR_API_TOKEN_HEADER from dapr.conf import settings from dapr.conf.helpers import GrpcEndpoint +from dapr.ext.workflow.logger import LoggerOptions, Logger T = TypeVar('T') TInput = TypeVar('TInput') @@ -37,7 +38,12 @@ class WorkflowRuntime: """WorkflowRuntime is the entry point for registering workflows and activities.""" - def __init__(self, host: Optional[str] = None, port: Optional[str] = None): + def __init__( + self, + host: Optional[str] = None, + port: Optional[str] = None, + logger_options: Optional[LoggerOptions] = None): + self._logger = Logger("WorkflowRuntime", logger_options) metadata = tuple() if settings.DAPR_API_TOKEN: metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),) @@ -48,14 +54,25 @@ def __init__(self, host: Optional[str] = None, port: Optional[str] = None): except ValueError as error: raise DaprInternalError(f'{error}') from error +<<<<<<< HEAD self.__worker = worker.TaskHubGrpcWorker( host_address=uri.endpoint, metadata=metadata, secure_channel=uri.tls ) +======= + options = self._logger.get_options() + self.__worker = worker.TaskHubGrpcWorker(host_address=uri.endpoint, + metadata=metadata, + secure_channel=uri.tls, + log_handler=options.log_handler, + log_formatter=options.log_formatter) +>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645)) def register_workflow(self, fn: Workflow, *, name: Optional[str] = None): + self._logger.info(f"Registering workflow '{fn.__name__}' with runtime") + def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None): - """Responsible to call Workflow function in orchestrationWrapper.""" - daprWfContext = DaprWorkflowContext(ctx) + """Responsible to call Workflow function in orchestrationWrapper""" + daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options()) if inp is None: return fn(daprWfContext) return fn(daprWfContext, inp) @@ -81,6 +98,10 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None): """Registers a workflow activity as a function that takes a specified input type and returns a specified output type. """ +<<<<<<< HEAD +======= + self._logger.info(f"Registering activity '{fn.__name__}' with runtime") +>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645)) def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None): """Responsible to call Activity function in activityWrapper""" diff --git a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py index 62a106261..43bcc23e4 100644 --- a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py @@ -28,6 +28,12 @@ class FakeOrchestrationContext: +<<<<<<< HEAD +======= + def __init__(self): + self.instance_id = mock_instance_id + +>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645)) def create_timer(self, fire_at): return mock_create_timer