-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnotifications.py
126 lines (98 loc) · 3.94 KB
/
notifications.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import logging
from typing import Dict, List
import configparser
from pathlib import Path
import json
import ntfy
logger = logging.getLogger("notification_manager")
config_file = Path.home() / ".nofconfig"
class _NotificationChannel(object):
"""
Interface for new channels.
New channels only *need* to overwrite "_send"
If a persistent connection is used, optionally "_connect" as well
If the message should be formatted differently for a channel, overwrite "_pprint"
"""
def __init__(self):
logger.debug(f"{self.__class__.__name__} instantiated")
config = configparser.ConfigParser()
config.read(config_file)
self.config: Dict[str, str] = config[str(self.__class__.__name__)[1:]]
self._connect()
def _connect(self) -> None:
pass
def _pprint(self, event: Dict) -> str:
"""
formats the kubernetes event into a nicely readable string
"""
return f"{event['type']}: {event['object'].kind} {event['object'].metadata.name} ({event['object'].metadata.owner_references[0].kind} {event['object'].metadata.owner_references[0].name})"
def _send(self, msg: str) -> None:
raise NotImplementedError
def notify(self, event: Dict) -> None:
msg = self._pprint(event)
try:
self._send(msg)
except:
self._connect()
try:
self._send(msg)
except Exception as e:
logger.error(e)
class NotificationManager(object):
def __init__(self, notification_channels: List[str]):
self.notification_channels = [
self._getChannel(val) for val in notification_channels
]
logger.debug(f"found channels: {self.notification_channels}")
@staticmethod
def _getChannel(channelName: str) -> _NotificationChannel:
import notifications as notif
return getattr(notif, "_" + channelName)()
def notify(self, event) -> None:
for channel in self.notification_channels:
logger.debug(f"notifying {channel} about {event['type']}")
channel.notify(event)
# implement new channels below!
class _rocketchat(_NotificationChannel):
def _connect(self):
from rocketchat.api import RocketChatAPI
self.rc = RocketChatAPI(
settings={
"username": self.config["username"],
"password": self.config["password"],
"domain": self.config["server"],
}
)
logging.getLogger("base").setLevel(
logging.WARN
) # FIXME still logs on debug level
self.im_room = self.rc.create_im_room(self.config["yourUsername"])
def _send(self, message: str):
self.rc.send_message(message=message, room_id=self.im_room["id"])
class _elasticsearch(_NotificationChannel):
def _connect(self):
from elasticsearch import Elasticsearch, helpers
import ssl
from elasticsearch.connection import create_ssl_context
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
# ignore https errors:
ssl_context = create_ssl_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self.es = Elasticsearch(
[
f'{self.config["protocol"]}://{self.config["username"]}:{self.config["password"]}@{self.config["url"]}:{self.config["port"]}'
],
ssl_context=ssl_context,
)
def _pprint(self, event: Dict):
return event.to_dict()
def _send(self, message: Dict):
self.es.index(index=self.config["index"], body=message)
class _ntfy(_NotificationChannel):
def _send(self, msg: str) -> None:
ntfy.notify(msg, title="K8s notifier")