Skip to content

[WEBHOOK]: Initial integration #1392

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions jac-cloud/jac_cloud/jaseci/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,17 @@ async def lifespan(app: _FaststAPI) -> AsyncGenerator[None, _FaststAPI]:

populate_yaml_specs(cls.__app__)

from .routers import healthz_router, sso_router, user_router
from ..plugin.jaseci import walker_router

for router in [healthz_router, sso_router, user_router, walker_router]:
from .routers import healthz_router, sso_router, user_router, webhook_router
from ..plugin.jaseci import walker_router, webhook_walker_router

for router in [
healthz_router,
sso_router,
user_router,
webhook_router,
walker_router,
webhook_walker_router,
]:
cls.__app__.include_router(router)

@cls.__app__.exception_handler(Exception)
Expand Down
18 changes: 14 additions & 4 deletions jac-cloud/jac_cloud/jaseci/datasources/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def keys(cls) -> list[bytes]:
return []

@classmethod
def set(cls, key: str, data: dict | bool) -> bool:
def set(cls, key: str, data: dict | bool | float) -> bool:
"""Push key value pair."""
try:
redis = cls.get_rd()
Expand Down Expand Up @@ -115,7 +115,7 @@ def hkeys(cls) -> list[str]:
return []

@classmethod
def hset(cls, key: str, data: dict | bool) -> bool:
def hset(cls, key: str, data: dict | bool | float) -> bool:
"""Push key value pair to group."""
try:
redis = cls.get_rd()
Expand Down Expand Up @@ -169,6 +169,16 @@ class TokenRedis(Redis):
__table__ = "token"


class WebhookRedis(Redis):
"""Token Memory Interface.

This interface is for Token Management.
You may override this if you wish to implement different structure
"""

__table__ = "webhook"


class AsyncRedis:
"""
Base Memory interface.
Expand Down Expand Up @@ -225,7 +235,7 @@ async def keys(cls) -> list[bytes]:
return []

@classmethod
async def set(cls, key: str, data: dict | bool) -> bool:
async def set(cls, key: str, data: dict | bool | float) -> bool:
"""Push key value pair."""
try:
redis = cls.get_rd()
Expand Down Expand Up @@ -266,7 +276,7 @@ async def hkeys(cls) -> list[str]:
return []

@classmethod
async def hset(cls, key: str, data: dict | bool) -> bool:
async def hset(cls, key: str, data: dict | bool | float) -> bool:
"""Push key value pair to group."""
try:
redis = cls.get_rd()
Expand Down
4 changes: 4 additions & 0 deletions jac-cloud/jac_cloud/jaseci/dtos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
UserResetPassword,
UserVerification,
)
from .webhook import Expiration, GenerateKey, KeyIDs


__all__ = [
Expand All @@ -18,4 +19,7 @@
"UserRequest",
"UserResetPassword",
"UserVerification",
"Expiration",
"GenerateKey",
"KeyIDs",
]
31 changes: 31 additions & 0 deletions jac-cloud/jac_cloud/jaseci/dtos/webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Jaseci User DTOs."""

from typing import Literal

from annotated_types import Len

from pydantic import BaseModel, Field, StringConstraints

from typing_extensions import Annotated


class Expiration(BaseModel):
"""Key Expiration."""

count: Annotated[int, Field(strict=True, gt=0, default=60)] = 60
interval: Literal["seconds", "minutes", "hours", "days"] = "days"


class GenerateKey(BaseModel):
"""User Creation Request Model."""

name: Annotated[str, StringConstraints(min_length=1)]
walkers: list[str] = Field(default_factory=list)
nodes: list[str] = Field(default_factory=list)
expiration: Expiration = Field(default_factory=Expiration)


class KeyIDs(BaseModel):
"""User Creation Request Model."""

ids: Annotated[list[str], Len(min_length=1)]
3 changes: 2 additions & 1 deletion jac-cloud/jac_cloud/jaseci/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Jaseci Models."""

from .user import NO_PASSWORD, User
from .webhook import Webhook

__all__ = ["NO_PASSWORD", "User"]
__all__ = ["NO_PASSWORD", "User", "Webhook"]
63 changes: 63 additions & 0 deletions jac-cloud/jac_cloud/jaseci/models/webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Jaseci Models."""

from dataclasses import asdict, dataclass, field
from datetime import datetime
from typing import Any, Generator, Mapping, cast

from bson import ObjectId

from ..datasources.collection import Collection as BaseCollection


@dataclass(kw_only=True)
class Webhook:
"""Webhook Base Model."""

id: ObjectId = field(default_factory=ObjectId)
name: str
root_id: ObjectId
walkers: list[str]
nodes: list[str]
expiration: datetime
key: str

class Collection(BaseCollection["Webhook"]):
"""
Webhook collection interface.

This interface is for Webhook Credentials Management.
You may override this if you wish to implement different structure
"""

__collection__ = "webhook"
__indexes__ = [
{"keys": ["name"], "unique": True},
{"keys": ["key"], "unique": True},
]

@classmethod
def __document__(cls, doc: Mapping[str, Any]) -> "Webhook":
"""
Return parsed Webhook from document.

This the default Webhook parser after getting a single document.
You may override this to specify how/which class it will be casted/based.
"""
doc = cast(dict, doc)
return Webhook(id=doc.pop("_id"), **doc)

@classmethod
def find_by_root_id(cls, root_id: ObjectId) -> Generator["Webhook", None, None]:
"""Retrieve webhook via root_id."""
return cls.find({"root_id": root_id})

@classmethod
def find_by_key(cls, key: str) -> "Webhook | None":
"""Retrieve webhook via root_id."""
return cls.find_one({"key": key})

def __serialize__(self) -> dict:
"""Return serializable."""
data = asdict(self)
data["_id"] = data.pop("id")
return data
3 changes: 2 additions & 1 deletion jac-cloud/jac_cloud/jaseci/routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
from .healthz import router as healthz_router
from .sso import router as sso_router
from .user import router as user_router
from .webhook import router as webhook_router

__all__ = ["healthz_router", "sso_router", "user_router"]
__all__ = ["healthz_router", "sso_router", "user_router", "webhook_router"]
166 changes: 166 additions & 0 deletions jac-cloud/jac_cloud/jaseci/routers/webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Webhook APIs."""

from datetime import timedelta

from bson import ObjectId

from fastapi import APIRouter, Request, status
from fastapi.responses import ORJSONResponse

from pymongo.errors import ConnectionFailure, OperationFailure

from ..datasources.redis import WebhookRedis
from ..dtos import Expiration, GenerateKey, KeyIDs
from ..models import Webhook
from ..security import authenticator
from ..utils import logger, random_string, utc_datetime, utc_timestamp
from ...core.architype import BulkWrite

router = APIRouter(prefix="/webhook", tags=["webhook"])


@router.get("", status_code=status.HTTP_200_OK, dependencies=authenticator)
def get(req: Request) -> ORJSONResponse:
"""Get keys API."""
root_id: ObjectId = req._user.root_id # type: ignore[attr-defined]

return ORJSONResponse(
content={
"keys": [
{
"id": str(key.id),
"name": key.name,
"root_id": str(key.root_id),
"walkers": key.walkers,
"nodes": key.nodes,
"expiration": key.expiration,
"key": key.key,
}
for key in Webhook.Collection.find({"root_id": root_id})
]
}
)


@router.post(
"/generate-key", status_code=status.HTTP_201_CREATED, dependencies=authenticator
)
def generate_key(req: Request, gen_key: GenerateKey) -> ORJSONResponse:
"""Generate key API."""
root_id: ObjectId = req._user.root_id # type: ignore[attr-defined]

_exp: dict[str, int] = {gen_key.expiration.interval: gen_key.expiration.count}
exp = utc_datetime(**_exp)

webhook = Webhook(
name=gen_key.name,
root_id=root_id,
walkers=gen_key.walkers,
nodes=gen_key.nodes,
expiration=exp,
key=f"{root_id}:{utc_timestamp()}:{random_string(32)}",
)

if (
id := Webhook.Collection.insert_one(webhook.__serialize__()).inserted_id
) and WebhookRedis.hset(
webhook.key,
{
"walkers": webhook.walkers,
"nodes": webhook.nodes,
"expiration": webhook.expiration.timestamp(),
},
):
return ORJSONResponse(
content={"id": str(id), "name": webhook.name, "key": webhook.key},
status_code=201,
)

return ORJSONResponse(
content="Can't generate key at the moment. Please try again!", status_code=500
)


@router.patch(
"/extend/{id}", status_code=status.HTTP_201_CREATED, dependencies=authenticator
)
def extend(id: str, expiration: Expiration) -> ORJSONResponse:
"""Generate key API."""
with Webhook.Collection.get_session() as session, session.start_transaction():
retry = 0
max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY
while retry <= max_retry:
try:
_id = ObjectId(id)
if webhook := Webhook.Collection.find_by_id(_id, session=session):
_exp: dict[str, int] = {expiration.interval: expiration.count}
webhook.expiration += timedelta(**_exp)

if Webhook.Collection.update_by_id(
_id, {"$set": {"expiration": webhook.expiration}}, session
).modified_count:
WebhookRedis.hset(
webhook.key,
{
"walkers": webhook.walkers,
"nodes": webhook.nodes,
"expiration": webhook.expiration.timestamp(),
},
)
BulkWrite.commit(session)
return ORJSONResponse(
{"message": "Successfully Extended!"}, 200
)
break
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
retry += 1
logger.error(
f"Error executing transaction! Retrying [{retry}/{max_retry}] ..."
)
continue
logger.exception("Error executing transaction!")
session.abort_transaction()
break
except Exception:
logger.exception("Error executing transaction!")
session.abort_transaction()
break

return ORJSONResponse(
content="Can't extend key at the moment. Please try again!", status_code=500
)


@router.delete("/delete", status_code=status.HTTP_200_OK, dependencies=authenticator)
def delete(key_ids: KeyIDs) -> ORJSONResponse:
"""Delete keys API."""
with Webhook.Collection.get_session() as session, session.start_transaction():
retry = 0
max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY
while retry <= max_retry:
try:
if Webhook.Collection.delete(
{"_id": {"$in": [ObjectId(id) for id in key_ids.ids]}}, session
).deleted_count == len(key_ids.ids):
BulkWrite.commit(session)
return ORJSONResponse({"message": "Successfully Deleted!"}, 200)
break
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
retry += 1
logger.error(
f"Error executing transaction! Retrying [{retry}/{max_retry}] ..."
)
continue
logger.exception("Error executing transaction!")
session.abort_transaction()
break
except Exception:
logger.exception("Error executing transaction!")
session.abort_transaction()
break

return ORJSONResponse(
content="Error occured during deletion. Please try again!", status_code=500
)
Loading
Loading