Skip to content

Commit

Permalink
Merge pull request #329 from TeskaLabs/feature/multi-url-webhook
Browse files Browse the repository at this point in the history
Storage: Accept multiple webhook URIs
  • Loading branch information
byewokko authored Jan 6, 2023
2 parents 18ca098 + 58a998c commit 5c5c799
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 19 deletions.
4 changes: 2 additions & 2 deletions asab/storage/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ async def execute(self, custom_data: typing.Optional[dict] = None):
# pass
# obj[k] = o

if self.Storage.WebhookURI is not None:
if self.Storage.WebhookURIs is not None:
webhook_data = {
"collection": self.Collection,
}
Expand All @@ -205,6 +205,6 @@ async def execute(self, custom_data: typing.Optional[dict] = None):
upsertor_data["unset"] = {k: v for k, v in self.ModUnset.items() if not k.startswith("__")}
webhook_data["upsertor"] = upsertor_data

await self._webhook(webhook_data)
await self.webhook(webhook_data)

return self.ObjId
17 changes: 15 additions & 2 deletions asab/storage/service.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import abc
import secrets
import hashlib

import logging
import asab
import re

try:
import cryptography.hazmat.primitives.ciphers
Expand All @@ -11,6 +12,12 @@
except ModuleNotFoundError:
cryptography = None

#

L = logging.getLogger(__name__)

#


ENCRYPTED_PREFIX = b"$aes-cbc$"

Expand All @@ -19,7 +26,13 @@ class StorageServiceABC(asab.Service):

def __init__(self, app, service_name):
super().__init__(app, service_name)
self.WebhookURI = asab.Config.get("asab:storage:changestream", "webhook_uri", fallback="") or None
self.WebhookURIs = asab.Config.get("asab:storage:changestream", "webhook_uri", fallback="") or None
if self.WebhookURIs is not None:
self.WebhookURIs = [uri for uri in re.split(r"\s+", self.WebhookURIs) if len(uri) > 0]
try:
self.ProactorService = app.get_service("asab.ProactorService")
except KeyError as e:
raise Exception("Storage webhooks require ProactorService") from e
self.WebhookAuth = asab.Config.get("asab:storage:changestream", "webhook_auth", fallback="") or None

# Specify a non-empty AES key to enable AES encryption of selected fields
Expand Down
40 changes: 25 additions & 15 deletions asab/storage/upsertor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import hashlib
import datetime
import logging
import aiohttp
import asab.web.rest.json
import requests
import typing

#
Expand Down Expand Up @@ -43,7 +43,7 @@ def __init__(self, storage, collection, obj_id, version=None):
self.ModPush = {}
self.ModPull = {}

self.WebhookResponseData = None
self.WebhookResponseData = {}


def get_id_name(self):
Expand Down Expand Up @@ -117,14 +117,14 @@ async def execute(self, custom_data: typing.Optional[dict] = None):
```python
upsertor = storage_service.upsertor("users")
upsertor.set("name", "Raccoon")
await upsertor.execute(custom_data={"action": "user_creation"})
await upsertor.execute(custom_data={"event_type": "create_user"})
```
will trigger a webhook whose payload may look like this:
```json
{
"collection": "users",
"custom": {"action": "user_creation"},
"custom": {"event_type": "create_user"},
"upsertor": {
"id": "2O-h3ulpO-ZwDrkSbQlYB3pYS0JJxCJj3nr6uQAu8aU",
"id_field_name": "_id",
Expand All @@ -143,21 +143,31 @@ async def execute(self, custom_data: typing.Optional[dict] = None):
pass


async def _webhook(self, data: dict):
assert self.Storage.WebhookURI is not None
async def webhook(self, data: dict):
assert self.Storage.WebhookURIs is not None
json_dump = asab.web.rest.json.JSONDumper(pretty=False)(data)
for uri in self.Storage.WebhookURIs:
self.WebhookResponseData[uri] = await self.Storage.ProactorService.execute(
self._webhook, json_dump, uri, self.Storage.WebhookAuth)



def _webhook(self, data, uri, auth=None):
try:
async with aiohttp.ClientSession(auth=self.Storage.WebhookAuth) as session:
async with session.put(
self.Storage.WebhookURI,
data=json_dump,
with requests.Session() as session:
if self.Storage.WebhookAuth:
session.headers["Authorization"] = self.Storage.WebhookAuth
with session.put(
uri,
data=data,
headers={"Content-Type": "application/json"}
) as response:
if response.status // 100 != 2:
text = await response.text()
L.error("Webhook endpoint responded with {}:\n{}".format(response.status, text))
return
self.WebhookResponseData = await response.json()
if response.status_code // 100 != 2:
text = response.text
L.error(
"Webhook endpoint responded with {}:\n{}".format(response.status_code, text),
struct_data={"uri": uri})
return response.json()
except json.decoder.JSONDecodeError as e:
L.error("Failed to decode JSON response from webhook: {}".format(str(e)))
except Exception as e:
Expand Down

0 comments on commit 5c5c799

Please sign in to comment.