Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logs to Dapr Workflows #645

Merged
merged 12 commits into from
Jan 10, 2024
Empty file added dapr/logger/__init__.py
Empty file.
50 changes: 50 additions & 0 deletions dapr/logger/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import Union
import logging


class LoggerOptions:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be the Logger class with init having the name parameter as required and rest as optional as they are now.
Then in one call logger = Logger(name) this is satisfied, is there a requirement that the logger must be initialized with the DaprWorkflowClient as seen in that file?

@berndverst thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mukundansundar Added logger class. Please review

def __init__(
self,
log_level: Union[str, None] = None,
log_handler: Union[logging.Handler, None] = None,
log_formatter: Union[logging.Formatter, None] = None,
):
# Set default log level to INFO if none is provided
if log_level is None:
log_level = logging.INFO
# Add a default log handler if none is provided
if log_handler is None:
log_handler = logging.StreamHandler()
# Set a default log formatter if none is provided
if log_formatter is None:
log_formatter = logging.Formatter(
fmt="%(asctime)s.%(msecs)03d %(name)s %(levelname)s: %(message)s",
datefmt='%Y-%m-%d %H:%M:%S')
self.log_level = log_level
self.log_handler = log_handler
self.log_formatter = log_formatter

def get_logger(
self,
name: str) -> logging.Logger:
logger = logging.Logger(name)
log_handler = self.log_handler
log_handler.setLevel(self.log_level)
log_handler.setFormatter(self.log_formatter)
logger.handlers.append(log_handler)
return logger
18 changes: 15 additions & 3 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
from dapr.conf import settings
from dapr.conf.helpers import GrpcEndpoint
from dapr.logger.options import LoggerOptions

T = TypeVar('T')
TInput = TypeVar('TInput')
Expand All @@ -43,19 +44,30 @@ class DaprWorkflowClient:
application.
"""

def __init__(self, host: Optional[str] = None, port: Optional[str] = None):
def __init__(
self,
host: Optional[str] = None,
port: Optional[str] = None,
logger_options: Optional[LoggerOptions] = None):
address = getAddress(host, port)

try:
uri = GrpcEndpoint(address)
except ValueError as error:
raise DaprInternalError(f'{error}') from error

if logger_options is None:
logger_options = LoggerOptions()
self._logger = logger_options.get_logger("DaprWorkflowClient")

metadata = tuple()
if settings.DAPR_API_TOKEN:
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
self.__obj = client.TaskHubGrpcClient(host_address=uri.endpoint, metadata=metadata,
secure_channel=uri.tls)
self.__obj = client.TaskHubGrpcClient(host_address=uri.endpoint,
metadata=metadata,
secure_channel=uri.tls,
log_handler=logger_options.log_handler,
log_formatter=logger_options.log_formatter)

def schedule_new_workflow(self, workflow: Workflow, *, input: Optional[TInput] = None,
instance_id: Optional[str] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from dapr.ext.workflow.workflow_context import WorkflowContext, Workflow
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
from dapr.logger.options import LoggerOptions

T = TypeVar('T')
TInput = TypeVar('TInput')
Expand All @@ -29,8 +30,15 @@
class DaprWorkflowContext(WorkflowContext):
"""DaprWorkflowContext that provides proxy access to internal OrchestrationContext instance."""

def __init__(self, ctx: task.OrchestrationContext):
def __init__(
self,
ctx: task.OrchestrationContext,
logger_options: Optional[LoggerOptions] = None):
self.__obj = ctx
if logger_options is None:
logger_options = LoggerOptions()
self._logger_options = logger_options
self._logger = logger_options.get_logger("DaprWorkflowContext")

# provide proxy access to regular attributes of wrapped object
def __getattr__(self, name):
Expand All @@ -49,26 +57,33 @@
return self.__obj.is_replaying

def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
self._logger.debug(f'instance_id {self.instance_id} create_timer: {fire_at}')
return self.__obj.create_timer(fire_at)

def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *,
input: TInput = None) -> task.Task[TOutput]:
self._logger.debug(f'instance_id {self.instance_id} call_activity: {activity.__name__}')
return self.__obj.call_activity(activity=activity.__name__, input=input)

def call_child_workflow(self, workflow: Workflow, *,
input: Optional[TInput],
instance_id: Optional[str]) -> task.Task[TOutput]:
self._logger.debug(f'instance_id {self.instance_id} \
call_child_workflow: {workflow.__name__}')

def wf(ctx: task.OrchestrationContext, inp: TInput):
daprWfContext = DaprWorkflowContext(ctx)
daprWfContext = DaprWorkflowContext(ctx, self._logger_options)

Check warning on line 75 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py#L75

Added line #L75 was not covered by tests
return workflow(daprWfContext, inp)
# copy workflow name so durabletask.worker can find the orchestrator in its registry
wf.__name__ = workflow.__name__
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)

def wait_for_external_event(self, name: str) -> task.Task:
self._logger.debug(f'instance_id {self.instance_id} wait_for_external_event: {name}')

Check warning on line 82 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py#L82

Added line #L82 was not covered by tests
shivamkm07 marked this conversation as resolved.
Show resolved Hide resolved
return self.__obj.wait_for_external_event(name)

def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
self._logger.debug(f'instance_id {self.instance_id} continue_as_new')

Check warning on line 86 in ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py#L86

Added line #L86 was not covered by tests
self.__obj.continue_as_new(new_input, save_events=save_events)


Expand Down
24 changes: 20 additions & 4 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
from dapr.conf import settings
from dapr.conf.helpers import GrpcEndpoint
from dapr.logger.options import LoggerOptions

T = TypeVar('T')
TInput = TypeVar('TInput')
Expand All @@ -36,7 +37,15 @@
"""WorkflowRuntime is the entry point for registering workflows and activities.
"""

def __init__(self, host: Optional[str] = None, port: Optional[str] = None):
def __init__(
self,
host: Optional[str] = None,
port: Optional[str] = None,
logger_options: Optional[LoggerOptions] = None):
if logger_options is None:
logger_options = LoggerOptions()
self._logger_options = logger_options
self._logger = logger_options.get_logger("WorkflowRuntime")
metadata = tuple()
if settings.DAPR_API_TOKEN:
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
Expand All @@ -47,13 +56,18 @@
except ValueError as error:
raise DaprInternalError(f'{error}') from error

self.__worker = worker.TaskHubGrpcWorker(host_address=uri.endpoint, metadata=metadata,
secure_channel=uri.tls)
self.__worker = worker.TaskHubGrpcWorker(host_address=uri.endpoint,
metadata=metadata,
secure_channel=uri.tls,
log_handler=logger_options.log_handler,
log_formatter=logger_options.log_formatter)

def register_workflow(self, fn: Workflow):
self._logger.info(f"Registering workflow '{fn.__name__}' with runtime")

def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
"""Responsible to call Workflow function in orchestrationWrapper"""
daprWfContext = DaprWorkflowContext(ctx)
daprWfContext = DaprWorkflowContext(ctx, self._logger_options)

Check warning on line 70 in ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py

View check run for this annotation

Codecov / codecov/patch

ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py#L70

Added line #L70 was not covered by tests
if inp is None:
return fn(daprWfContext)
return fn(daprWfContext, inp)
Expand All @@ -64,6 +78,8 @@
"""Registers a workflow activity as a function that takes
a specified input type and returns a specified output type.
"""
self._logger.info(f"Registering activity '{fn.__name__}' with runtime")

def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None):
"""Responsible to call Activity function in activityWrapper"""
wfActivityContext = WorkflowActivityContext(ctx)
Expand Down
2 changes: 2 additions & 0 deletions ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@


class FakeOrchestrationContext:
def __init__(self):
self.instance_id = mock_instance_id

def create_timer(self, fire_at):
return mock_create_timer
Expand Down
Loading