Skip to content

Commit

Permalink
[WEBHOOK]: Initial integration
Browse files Browse the repository at this point in the history
  • Loading branch information
amadolid committed Jan 7, 2025
1 parent 3b4bfbc commit 7565677
Show file tree
Hide file tree
Showing 16 changed files with 747 additions and 134 deletions.
15 changes: 11 additions & 4 deletions jac-cloud/jac_cloud/jaseci/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,17 @@ async def lifespan(app: _FaststAPI) -> AsyncGenerator[None, _FaststAPI]:

populate_yaml_specs(cls.__app__)

from .routers import healthz_router, sso_router, user_router
from ..plugin.jaseci import walker_router

for router in [healthz_router, sso_router, user_router, walker_router]:
from .routers import healthz_router, sso_router, user_router, webhook_router
from ..plugin.jaseci import walker_router, webhook_walker_router

for router in [
healthz_router,
sso_router,
user_router,
webhook_router,
walker_router,
webhook_walker_router,
]:
cls.__app__.include_router(router)

@cls.__app__.exception_handler(Exception)
Expand Down
18 changes: 14 additions & 4 deletions jac-cloud/jac_cloud/jaseci/datasources/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def keys(cls) -> list[bytes]:
return []

@classmethod
def set(cls, key: str, data: dict | bool) -> bool:
def set(cls, key: str, data: dict | bool | float) -> bool:
"""Push key value pair."""
try:
redis = cls.get_rd()
Expand Down Expand Up @@ -115,7 +115,7 @@ def hkeys(cls) -> list[str]:
return []

@classmethod
def hset(cls, key: str, data: dict | bool) -> bool:
def hset(cls, key: str, data: dict | bool | float) -> bool:
"""Push key value pair to group."""
try:
redis = cls.get_rd()
Expand Down Expand Up @@ -169,6 +169,16 @@ class TokenRedis(Redis):
__table__ = "token"


class WebhookRedis(Redis):
"""Token Memory Interface.
This interface is for Token Management.
You may override this if you wish to implement different structure
"""

__table__ = "webhook"


class AsyncRedis:
"""
Base Memory interface.
Expand Down Expand Up @@ -225,7 +235,7 @@ async def keys(cls) -> list[bytes]:
return []

@classmethod
async def set(cls, key: str, data: dict | bool) -> bool:
async def set(cls, key: str, data: dict | bool | float) -> bool:
"""Push key value pair."""
try:
redis = cls.get_rd()
Expand Down Expand Up @@ -266,7 +276,7 @@ async def hkeys(cls) -> list[str]:
return []

@classmethod
async def hset(cls, key: str, data: dict | bool) -> bool:
async def hset(cls, key: str, data: dict | bool | float) -> bool:
"""Push key value pair to group."""
try:
redis = cls.get_rd()
Expand Down
4 changes: 4 additions & 0 deletions jac-cloud/jac_cloud/jaseci/dtos/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
UserResetPassword,
UserVerification,
)
from .webhook import Expiration, GenerateKey, KeyIDs


__all__ = [
Expand All @@ -18,4 +19,7 @@
"UserRequest",
"UserResetPassword",
"UserVerification",
"Expiration",
"GenerateKey",
"KeyIDs",
]
31 changes: 31 additions & 0 deletions jac-cloud/jac_cloud/jaseci/dtos/webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Jaseci User DTOs."""

from typing import Literal

from annotated_types import Len

from pydantic import BaseModel, Field, StringConstraints

from typing_extensions import Annotated


class Expiration(BaseModel):
"""Key Expiration."""

count: Annotated[int, Field(strict=True, gt=0, default=60)] = 60
interval: Literal["seconds", "minutes", "hours", "days"] = "days"


class GenerateKey(BaseModel):
"""User Creation Request Model."""

name: Annotated[str, StringConstraints(min_length=1)]
walkers: list[str] = Field(default_factory=list)
nodes: list[str] = Field(default_factory=list)
expiration: Expiration = Field(default_factory=Expiration)


class KeyIDs(BaseModel):
"""User Creation Request Model."""

ids: Annotated[list[str], Len(min_length=1)]
3 changes: 2 additions & 1 deletion jac-cloud/jac_cloud/jaseci/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Jaseci Models."""

from .user import NO_PASSWORD, User
from .webhook import Webhook

__all__ = ["NO_PASSWORD", "User"]
__all__ = ["NO_PASSWORD", "User", "Webhook"]
63 changes: 63 additions & 0 deletions jac-cloud/jac_cloud/jaseci/models/webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
"""Jaseci Models."""

from dataclasses import asdict, dataclass, field
from datetime import datetime
from typing import Any, Generator, Mapping, cast

from bson import ObjectId

from ..datasources.collection import Collection as BaseCollection


@dataclass(kw_only=True)
class Webhook:
"""User Base Model."""

id: ObjectId = field(default_factory=ObjectId)
name: str
root_id: ObjectId
walkers: list[str]
nodes: list[str]
expiration: datetime
key: str

class Collection(BaseCollection["Webhook"]):
"""
User collection interface.
This interface is for User's Management.
You may override this if you wish to implement different structure
"""

__collection__ = "webhook"
__indexes__ = [
{"keys": ["name"], "unique": True},
{"keys": ["key"], "unique": True},
]

@classmethod
def __document__(cls, doc: Mapping[str, Any]) -> "Webhook":
"""
Return parsed Webhook from document.
This the default User parser after getting a single document.
You may override this to specify how/which class it will be casted/based.
"""
doc = cast(dict, doc)
return Webhook(id=doc.pop("_id"), **doc)

@classmethod
def find_by_root_id(cls, root_id: ObjectId) -> Generator["Webhook", None, None]:
"""Retrieve webhook via root_id."""
return cls.find({"root_id": root_id})

@classmethod
def find_by_key(cls, key: str) -> "Webhook | None":
"""Retrieve webhook via root_id."""
return cls.find_one({"key": key})

def __serialize__(self) -> dict:
"""Return serializable."""
data = asdict(self)
data["_id"] = data.pop("id")
return data
3 changes: 2 additions & 1 deletion jac-cloud/jac_cloud/jaseci/routers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
from .healthz import router as healthz_router
from .sso import router as sso_router
from .user import router as user_router
from .webhook import router as webhook_router

__all__ = ["healthz_router", "sso_router", "user_router"]
__all__ = ["healthz_router", "sso_router", "user_router", "webhook_router"]
166 changes: 166 additions & 0 deletions jac-cloud/jac_cloud/jaseci/routers/webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""Webhook APIs."""

from datetime import timedelta

from bson import ObjectId

from fastapi import APIRouter, Request, status
from fastapi.responses import ORJSONResponse

from pymongo.errors import ConnectionFailure, OperationFailure

from ..datasources.redis import WebhookRedis
from ..dtos import Expiration, GenerateKey, KeyIDs
from ..models import Webhook
from ..security import authenticator
from ..utils import logger, random_string, utc_datetime, utc_timestamp
from ...core.architype import BulkWrite

router = APIRouter(prefix="/webhook", tags=["webhook"])


@router.get("", status_code=status.HTTP_200_OK, dependencies=authenticator)
def get(req: Request) -> ORJSONResponse:
"""Get keys API."""
root_id: ObjectId = req._user.root_id # type: ignore[attr-defined]

return ORJSONResponse(
content={
"keys": [
{
"id": str(key.id),
"name": key.name,
"root_id": str(key.root_id),
"walkers": key.walkers,
"nodes": key.nodes,
"expiration": key.expiration,
"key": key.key,
}
for key in Webhook.Collection.find({"root_id": root_id})
]
}
)


@router.post(
"/generate-key", status_code=status.HTTP_201_CREATED, dependencies=authenticator
)
def generate_key(req: Request, gen_key: GenerateKey) -> ORJSONResponse:
"""Generate key API."""
root_id: ObjectId = req._user.root_id # type: ignore[attr-defined]

_exp: dict[str, int] = {gen_key.expiration.interval: gen_key.expiration.count}
exp = utc_datetime(**_exp)

webhook = Webhook(
name=gen_key.name,
root_id=root_id,
walkers=gen_key.walkers,
nodes=gen_key.nodes,
expiration=exp,
key=f"{root_id}:{utc_timestamp()}:{random_string(32)}",
)

if (
id := Webhook.Collection.insert_one(webhook.__serialize__()).inserted_id
) and WebhookRedis.hset(
webhook.key,
{
"walkers": webhook.walkers,
"nodes": webhook.nodes,
"expiration": webhook.expiration.timestamp(),
},
):
return ORJSONResponse(
content={"id": str(id), "name": webhook.name, "key": webhook.key},
status_code=201,
)

return ORJSONResponse(
content="Can't generate key at the moment. Please try again!", status_code=500
)


@router.patch(
"/extend/{id}", status_code=status.HTTP_201_CREATED, dependencies=authenticator
)
def extend(id: str, expiration: Expiration) -> ORJSONResponse:
"""Generate key API."""
with Webhook.Collection.get_session() as session, session.start_transaction():
retry = 0
max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY
while retry <= max_retry:
try:
_id = ObjectId(id)
if webhook := Webhook.Collection.find_by_id(_id, session=session):
_exp: dict[str, int] = {expiration.interval: expiration.count}
webhook.expiration += timedelta(**_exp)

if Webhook.Collection.update_by_id(
_id, {"$set": {"expiration": webhook.expiration}}, session
).modified_count:
WebhookRedis.hset(
webhook.key,
{
"walkers": webhook.walkers,
"nodes": webhook.nodes,
"expiration": webhook.expiration.timestamp(),
},
)
BulkWrite.commit(session)
return ORJSONResponse(
{"message": "Successfully Extended!"}, 200
)
break
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
retry += 1
logger.error(
f"Error executing transaction! Retrying [{retry}/{max_retry}] ..."
)
continue
logger.exception("Error executing transaction!")
session.abort_transaction()
break
except Exception:
logger.exception("Error executing transaction!")
session.abort_transaction()
break

return ORJSONResponse(
content="Can't extend key at the moment. Please try again!", status_code=500
)


@router.delete("/delete", status_code=status.HTTP_200_OK, dependencies=authenticator)
def delete(key_ids: KeyIDs) -> ORJSONResponse:
"""Delete keys API."""
with Webhook.Collection.get_session() as session, session.start_transaction():
retry = 0
max_retry = BulkWrite.SESSION_MAX_TRANSACTION_RETRY
while retry <= max_retry:
try:
if Webhook.Collection.delete(
{"_id": {"$in": [ObjectId(id) for id in key_ids.ids]}}, session
).deleted_count == len(key_ids.ids):
BulkWrite.commit(session)
return ORJSONResponse({"message": "Successfully Deleted!"}, 200)
break
except (ConnectionFailure, OperationFailure) as ex:
if ex.has_error_label("TransientTransactionError"):
retry += 1
logger.error(
f"Error executing transaction! Retrying [{retry}/{max_retry}] ..."
)
continue
logger.exception("Error executing transaction!")
session.abort_transaction()
break
except Exception:
logger.exception("Error executing transaction!")
session.abort_transaction()
break

return ORJSONResponse(
content="Error occured during deletion. Please try again!", status_code=500
)
Loading

0 comments on commit 7565677

Please sign in to comment.