Skip to content

Commit

Permalink
Add logs to Dapr Workflows (dapr#645)
Browse files Browse the repository at this point in the history
  • Loading branch information
shivamkm07 authored and litan1106 committed Jan 10, 2024
1 parent 21bc8e1 commit d042153
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 6 deletions.
18 changes: 17 additions & 1 deletion ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
from dapr.conf import settings
from dapr.conf.helpers import GrpcEndpoint
from dapr.ext.workflow.logger import LoggerOptions, Logger

T = TypeVar('T')
TInput = TypeVar('TInput')
Expand All @@ -43,20 +44,35 @@ class DaprWorkflowClient:
application.
"""

def __init__(self, host: Optional[str] = None, port: Optional[str] = None):
def __init__(
self,
host: Optional[str] = None,
port: Optional[str] = None,
logger_options: Optional[LoggerOptions] = None):
address = getAddress(host, port)

try:
uri = GrpcEndpoint(address)
except ValueError as error:
raise DaprInternalError(f'{error}') from error

self._logger = Logger("DaprWorkflowClient", logger_options)

metadata = tuple()
if settings.DAPR_API_TOKEN:
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
<<<<<<< HEAD
self.__obj = client.TaskHubGrpcClient(
host_address=uri.endpoint, metadata=metadata, secure_channel=uri.tls
)
=======
options = self._logger.get_options()
self.__obj = client.TaskHubGrpcClient(host_address=uri.endpoint,
metadata=metadata,
secure_channel=uri.tls,
log_handler=options.log_handler,
log_formatter=options.log_formatter)
>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645))

def schedule_new_workflow(
self,
Expand Down
26 changes: 24 additions & 2 deletions ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

from dapr.ext.workflow.workflow_context import WorkflowContext, Workflow
from dapr.ext.workflow.workflow_activity_context import WorkflowActivityContext
from dapr.ext.workflow.logger import LoggerOptions, Logger

T = TypeVar('T')
TInput = TypeVar('TInput')
Expand All @@ -29,8 +30,12 @@
class DaprWorkflowContext(WorkflowContext):
"""DaprWorkflowContext that provides proxy access to internal OrchestrationContext instance."""

def __init__(self, ctx: task.OrchestrationContext):
def __init__(
self,
ctx: task.OrchestrationContext,
logger_options: Optional[LoggerOptions] = None):
self.__obj = ctx
self._logger = Logger("DaprWorkflowContext", logger_options)

# provide proxy access to regular attributes of wrapped object
def __getattr__(self, name):
Expand All @@ -49,26 +54,41 @@ def is_replaying(self) -> bool:
return self.__obj.is_replaying

def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
self._logger.debug(f'{self.instance_id}: Creating timer to fire at {fire_at} time')
return self.__obj.create_timer(fire_at)

<<<<<<< HEAD
def call_activity(
self,
activity: Callable[[WorkflowActivityContext, TInput], TOutput],
*,
input: TInput = None,
) -> task.Task[TOutput]:
=======
def call_activity(self, activity: Callable[[WorkflowActivityContext, TInput], TOutput], *,
input: TInput = None) -> task.Task[TOutput]:
self._logger.debug(f'{self.instance_id}: Creating activity {activity.__name__}')
>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645))
if hasattr(activity, '_dapr_alternate_name'):
return self.__obj.call_activity(
activity=activity.__dict__['_dapr_alternate_name'], input=input
)
# this return should ideally never execute
return self.__obj.call_activity(activity=activity.__name__, input=input)

<<<<<<< HEAD
def call_child_workflow(
self, workflow: Workflow, *, input: Optional[TInput], instance_id: Optional[str]
) -> task.Task[TOutput]:
=======
def call_child_workflow(self, workflow: Workflow, *,
input: Optional[TInput],
instance_id: Optional[str]) -> task.Task[TOutput]:
self._logger.debug(f'{self.instance_id}: Creating child workflow {workflow.__name__}')

>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645))
def wf(ctx: task.OrchestrationContext, inp: TInput):
daprWfContext = DaprWorkflowContext(ctx)
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
return workflow(daprWfContext, inp)

# copy workflow name so durabletask.worker can find the orchestrator in its registry
Expand All @@ -81,9 +101,11 @@ def wf(ctx: task.OrchestrationContext, inp: TInput):
return self.__obj.call_sub_orchestrator(wf, input=input, instance_id=instance_id)

def wait_for_external_event(self, name: str) -> task.Task:
self._logger.debug(f'{self.instance_id}: Waiting for external event {name}')
return self.__obj.wait_for_external_event(name)

def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
self._logger.debug(f'{self.instance_id}: Continuing as new')
self.__obj.continue_as_new(new_input, save_events=save_events)


Expand Down
7 changes: 7 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/logger/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from dapr.ext.workflow.logger.options import LoggerOptions
from dapr.ext.workflow.logger.logger import Logger

__all__ = [
'LoggerOptions',
'Logger'
]
37 changes: 37 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/logger/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging
from typing import Union
from dapr.ext.workflow.logger.options import LoggerOptions


class Logger:
def __init__(self,
name: str,
options: Union[LoggerOptions, None] = None):
# If options is None, then create a new LoggerOptions object
if options is None:
options = LoggerOptions()
log_handler = options.log_handler
log_handler.setLevel(options.log_level)
log_handler.setFormatter(options.log_formatter)
logger = logging.getLogger(name)
logger.handlers.append(log_handler)
self._logger_options = options
self._logger = logger

def get_options(self) -> LoggerOptions:
return self._logger_options

def debug(self, msg, *args, **kwargs):
self._logger.debug(msg, *args, **kwargs)

def info(self, msg, *args, **kwargs):
self._logger.info(msg, *args, **kwargs)

def warning(self, msg, *args, **kwargs):
self._logger.warning(msg, *args, **kwargs)

def error(self, msg, *args, **kwargs):
self._logger.error(msg, *args, **kwargs)

def critical(self, msg, *args, **kwargs):
self._logger.critical(msg, *args, **kwargs)
40 changes: 40 additions & 0 deletions ext/dapr-ext-workflow/dapr/ext/workflow/logger/options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-

"""
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from typing import Union
import logging


class LoggerOptions:
def __init__(
self,
log_level: Union[str, None] = None,
log_handler: Union[logging.Handler, None] = None,
log_formatter: Union[logging.Formatter, None] = None,
):
# Set default log level to INFO if none is provided
if log_level is None:
log_level = logging.INFO
# Add a default log handler if none is provided
if log_handler is None:
log_handler = logging.StreamHandler()
# Set a default log formatter if none is provided
if log_formatter is None:
log_formatter = logging.Formatter(
fmt="%(asctime)s.%(msecs)03d %(name)s %(levelname)s: %(message)s",
datefmt='%Y-%m-%d %H:%M:%S')
self.log_level = log_level
self.log_handler = log_handler
self.log_formatter = log_formatter
27 changes: 24 additions & 3 deletions ext/dapr-ext-workflow/dapr/ext/workflow/workflow_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from dapr.clients.http.client import DAPR_API_TOKEN_HEADER
from dapr.conf import settings
from dapr.conf.helpers import GrpcEndpoint
from dapr.ext.workflow.logger import LoggerOptions, Logger

T = TypeVar('T')
TInput = TypeVar('TInput')
Expand All @@ -37,7 +38,12 @@
class WorkflowRuntime:
"""WorkflowRuntime is the entry point for registering workflows and activities."""

def __init__(self, host: Optional[str] = None, port: Optional[str] = None):
def __init__(
self,
host: Optional[str] = None,
port: Optional[str] = None,
logger_options: Optional[LoggerOptions] = None):
self._logger = Logger("WorkflowRuntime", logger_options)
metadata = tuple()
if settings.DAPR_API_TOKEN:
metadata = ((DAPR_API_TOKEN_HEADER, settings.DAPR_API_TOKEN),)
Expand All @@ -48,14 +54,25 @@ def __init__(self, host: Optional[str] = None, port: Optional[str] = None):
except ValueError as error:
raise DaprInternalError(f'{error}') from error

<<<<<<< HEAD
self.__worker = worker.TaskHubGrpcWorker(
host_address=uri.endpoint, metadata=metadata, secure_channel=uri.tls
)
=======
options = self._logger.get_options()
self.__worker = worker.TaskHubGrpcWorker(host_address=uri.endpoint,
metadata=metadata,
secure_channel=uri.tls,
log_handler=options.log_handler,
log_formatter=options.log_formatter)
>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645))

def register_workflow(self, fn: Workflow, *, name: Optional[str] = None):
self._logger.info(f"Registering workflow '{fn.__name__}' with runtime")

def orchestrationWrapper(ctx: task.OrchestrationContext, inp: Optional[TInput] = None):
"""Responsible to call Workflow function in orchestrationWrapper."""
daprWfContext = DaprWorkflowContext(ctx)
"""Responsible to call Workflow function in orchestrationWrapper"""
daprWfContext = DaprWorkflowContext(ctx, self._logger.get_options())
if inp is None:
return fn(daprWfContext)
return fn(daprWfContext, inp)
Expand All @@ -81,6 +98,10 @@ def register_activity(self, fn: Activity, *, name: Optional[str] = None):
"""Registers a workflow activity as a function that takes
a specified input type and returns a specified output type.
"""
<<<<<<< HEAD
=======
self._logger.info(f"Registering activity '{fn.__name__}' with runtime")
>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645))

def activityWrapper(ctx: task.ActivityContext, inp: Optional[TInput] = None):
"""Responsible to call Activity function in activityWrapper"""
Expand Down
6 changes: 6 additions & 0 deletions ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@


class FakeOrchestrationContext:
<<<<<<< HEAD
=======
def __init__(self):
self.instance_id = mock_instance_id

>>>>>>> 2c328d1 (Add logs to Dapr Workflows (#645))
def create_timer(self, fire_at):
return mock_create_timer

Expand Down

0 comments on commit d042153

Please sign in to comment.