Skip to content

Commit

Permalink
Port master changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
riga committed Jan 5, 2025
1 parent f129881 commit 2319a07
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 30 deletions.
3 changes: 2 additions & 1 deletion law/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"NO_STR", "NO_INT", "NO_FLOAT", "is_no_param", "get_param", "Parameter",
"TaskInstanceParameter", "OptionalBoolParameter", "DurationParameter", "BytesParameter",
"CSVParameter", "MultiCSVParameter", "RangeParameter", "MultiRangeParameter", "NotifyParameter",
"NotifyMultiParameter", "NotifyMailParameter",
"NotifyMultiParameter", "NotifyMailParameter", "NotifyCustomParameter",
"Config",
"run", "no_value",
"notify_mail",
Expand Down Expand Up @@ -65,6 +65,7 @@
NO_STR, NO_INT, NO_FLOAT, is_no_param, get_param, Parameter, TaskInstanceParameter,
OptionalBoolParameter, DurationParameter, BytesParameter, CSVParameter, MultiCSVParameter,
RangeParameter, MultiRangeParameter, NotifyParameter, NotifyMultiParameter, NotifyMailParameter,
NotifyCustomParameter,
)
from law.target.file import (
FileSystemTarget, FileSystemFileTarget, FileSystemDirectoryTarget, localize_file_targets,
Expand Down
14 changes: 7 additions & 7 deletions law/contrib/mattermost/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ def __init__(self, *args, **kwargs) -> None:
"a Mattermost notification is sent once the task finishes"
)

def get_transport(self) -> dict[str, Any]:
return {
"func": self.notify,
"raw": True,
"colored": False,
}

@classmethod
def notify(cls, success: bool, title: str, content: dict[str, Any], **kwargs) -> bool:
content = OrderedDict(content)
Expand Down Expand Up @@ -65,10 +72,3 @@ def notify(cls, success: bool, title: str, content: dict[str, Any], **kwargs) ->

# send the notification
return notify_mattermost(title, content, **kwargs)

def get_transport(self) -> dict[str, Any]:
return {
"func": self.notify,
"raw": True,
"colored": False,
}
14 changes: 7 additions & 7 deletions law/contrib/slack/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ def __init__(self, *args, **kwargs) -> None:
"law.decorator.notify, a Slack notification is sent once the task finishes"
)

def get_transport(self) -> dict[str, Any]:
return {
"func": self.notify,
"raw": True,
"colored": False,
}

@classmethod
def notify(cls, success: bool, title: str, content: dict[str, Any], **kwargs) -> bool:
# escape the full content
Expand Down Expand Up @@ -54,10 +61,3 @@ def notify(cls, success: bool, title: str, content: dict[str, Any], **kwargs) ->

# send the notification
return notify_slack(title, content, attachment_color=color, **kwargs)

def get_transport(self) -> dict[str, Any]:
return {
"func": self.notify,
"raw": True,
"colored": False,
}
1 change: 1 addition & 0 deletions law/contrib/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class TransferLocalFile(Task):
)

exclude_index = True
exclude_params_repr_empty = {"source_path"}

def get_source_target(self) -> LocalFileTarget:
# when self.source_path is set, return a target around it
Expand Down
14 changes: 7 additions & 7 deletions law/contrib/telegram/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ def __init__(self, *args, **kwargs) -> None:
"law.decorator.notify, a Telegram notification is sent once the task finishes"
)

def get_transport(self) -> dict[str, Any]:
return {
"func": self.notify,
"raw": True,
"colored": False,
}

@staticmethod
def notify(success: bool, title: str, content: dict[str, str], **kwargs) -> bool:
# escape the full content
Expand All @@ -51,10 +58,3 @@ def notify(success: bool, title: str, content: dict[str, str], **kwargs) -> bool

# send the notification
return notify_telegram(title, content, **kwargs)

def get_transport(self) -> dict[str, Any]:
return {
"func": self.notify,
"raw": True,
"colored": False,
}
2 changes: 1 addition & 1 deletion law/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,10 +441,10 @@ def send(error: Exception | None, transports: list[dict], t0: float) -> None:
status_string = "succeeded" if success else "failed"
title = "Task {} {}!".format(_task.get_task_family(), status_string)
parts = collections.OrderedDict([
("Task", str(_task)),
("Host", socket.gethostname()),
("Duration", duration),
("Last message", "-" if not len(_task._message_cache) else _task._message_cache[-1]),
("Task", str(_task)),
])
if not success:
parts["Traceback"] = traceback.format_exc()
Expand Down
60 changes: 59 additions & 1 deletion law/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@

from __future__ import annotations

__all__ = ["notify_mail"]
__all__ = ["notify_mail", "notify_custom"]

import importlib

from law.config import Config
from law.util import send_mail, uncolored
from law.logger import get_logger
from law._types import Any, Callable


logger = get_logger(__name__)
Expand Down Expand Up @@ -66,3 +69,58 @@ def notify_mail(
content=uncolored(message),
**mail_kwargs, # type: ignore[arg-type]
)


def notify_custom(
title: str,
content: dict[str, Any],
notify_func: Callable[[str, dict[str, Any], Any], Any] | str | None = None,
**kwargs,
) -> bool:
"""
Sends a notification with *title* and *content* using a custom *notify_func*. When *notify_func*
is empty, the configuration value "custom_func" in the [notifications] section is used. When it
is a string (which it will be when obtained from the config), it should have the format
``"module.id.func"``. The function is then imported and called with the *title* and *message*.
*True* is returned when the notification was sent successfully, *False* otherwise.
"""
# prepare the notify function
if not notify_func:
cfg = Config.instance()
notify_func = cfg.get_expanded("notifications", "custom_func", default=None)
if not notify_func:
logger.warning("cannot send custom notification, notify_func empty")
return False
if isinstance(notify_func, str):
try:
module_id, func_name = notify_func.rsplit(".", 1)
except ValueError:
logger.warning(
f"cannot send custom notification, notify_func '{notify_func}' has invalid format",
)
return False
try:
notify_module = importlib.import_module(module_id)
except ImportError:
logger.warning(f"cannot send custom notification, module '{module_id}' not found")
return False
notify_func = getattr(notify_module, func_name, None)
if not notify_func:
logger.warning(
f"cannot send custom notification, notify_func '{notify_func}' not found",
)
return False
if not callable(notify_func):
logger.warning(f"cannot send custom notification, notify_func '{notify_func}' not callable")
return False

# invoke it
try:
notify_func(title, content, **kwargs)
except TypeError:
logger.warning(
f"cannot send custom notification, notify_func '{notify_func}' has invalid signature",
)
return False

return True
65 changes: 60 additions & 5 deletions law/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

import functools
import csv
from collections import OrderedDict

import luigi # type: ignore[import-untyped]

from law.notification import notify_mail
from law.notification import notify_mail, notify_custom
from law.util import (
human_duration, parse_duration, time_units, time_unit_aliases, human_bytes, parse_bytes,
byte_units, is_lazy_iterable, make_tuple, make_unique, brace_expand, range_expand, try_int,
Expand Down Expand Up @@ -887,19 +888,73 @@ def __init__(self, *args, **kwargs) -> None:
"email notification is sent once the task finishes"
)

def get_transport(self) -> dict[str, Any]:
""""""
return {
"func": self.notify,
"raw": True,
"colored": False,
}

@classmethod
def notify(cls, *args, **kwargs) -> bool:
def notify(cls, success: bool, title: str, content: dict[str, Any], **kwargs) -> bool:
""""""
return notify_mail(*args, **kwargs)
title, message = cls.format_message(success, title, content)
return notify_mail(title, message, **kwargs)

@classmethod
def format_message(cls, success: bool, title: str, content: dict[str, Any]) -> tuple[str, str]:
""""""
content = OrderedDict(content)

# status text
content["Status"] = "success" if success else "failure"
content.move_to_end("Status", last=False)

# markup for traceback
if "Traceback" in content:
content["Traceback"] = f"\n```\n{content['Traceback']}\n```"

# format message
message = "\n".join(f"**{k}**: {v}" for k, v in content.items())

return title, message


class NotifyCustomParameter(NotifyParameter):
"""
Notification parameter defining a custom notification transport.The *notify_func* argument can
be used to pass a custom notification function. When empty, the default implemented in
:py:meth:`law.notification.notify_custom` is used.
"""

def __init__(self, *args, **kwargs) -> None:
# store the notification attributes
self.raw = kwargs.pop("raw", True)
self.notify_func = kwargs.pop("notify_func", None)

super().__init__(*args, **kwargs)

if not self.description:
self.description = (
"when true, and the task's run method is decorated with law.decorator.notify, "
"a custom notification is sent once the task finishes"
)

def get_transport(self) -> dict[str, Any]:
""""""
return {
"func": self.notify,
"raw": False,
"func": functools.partial(self.notify, notify_func=self.notify_func),
"raw": self.raw,
"colored": False,
}

@classmethod
def notify(cls, success: bool, *args, **kwargs) -> bool:
""""""
# success is not forwarded, as the message content will have a field "Traceback" when failed
return notify_custom(*args, **kwargs)


# trailing imports
from law.task.base import Task
2 changes: 1 addition & 1 deletion law/task/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ def _repr_params(self, all_params: bool = False) -> dict[str, Any]:
include = (
param.significant and
not multi_match(name, exclude) and
(name not in self.exclude_params_repr_empty or value)
(value not in (None, "NO_STR") or name not in self.exclude_params_repr_empty)
)
if include:
params[name] = value
Expand Down

0 comments on commit 2319a07

Please sign in to comment.