Skip to content

Commit

Permalink
[server] expose webhook_challenge_handler, webhook_update_handler
Browse files Browse the repository at this point in the history
… and `flow_request_handler`
  • Loading branch information
david-lev committed May 23, 2024
1 parent 5e595ba commit 06d3bb7
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 155 deletions.
55 changes: 1 addition & 54 deletions pywa/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@
import mimetypes
import os
import pathlib
import threading
import warnings
from types import NoneType
from typing import BinaryIO, Iterable, Literal, Any, Callable

import requests

from . import utils, errors
from . import utils
from .api import WhatsAppCloudApi
from .handlers import Handler, HandlerDecorators, FlowRequestHandler # noqa
from .types import (
Expand Down Expand Up @@ -219,58 +218,6 @@ def _setup_api(
api_version=api_version,
)

def _delayed_register_callback_url(
self,
callback_url: str,
app_id: int,
app_secret: str,
verify_token: str,
fields: tuple[str, ...] | None,
delay: int,
) -> None:
threading.Timer(
interval=delay,
function=self._register_callback_url,
kwargs=dict(
callback_url=callback_url,
app_id=app_id,
app_secret=app_secret,
verify_token=verify_token,
fields=fields,
),
).start()

def _register_callback_url(
self,
callback_url: str,
app_id: int,
app_secret: str,
verify_token: str,
fields: tuple[str, ...] | None,
) -> None:
"""
This is a non-blocking function that registers the callback URL.
It must be called after the server is running so that the challenge can be verified.
"""
try:
app_access_token = self.api.get_app_access_token(
app_id=app_id, app_secret=app_secret
)
# noinspection PyProtectedMember
if not self.api.set_app_callback_url(
app_id=app_id,
app_access_token=app_access_token["access_token"],
callback_url=callback_url,
verify_token=verify_token,
fields=fields,
)["success"]:
raise RuntimeError("Failed to register callback URL.")
_logger.info("Callback URL '%s' registered successfully", callback_url)
except errors.WhatsAppError as e:
raise RuntimeError(
f"Failed to register callback URL '{callback_url}'"
) from e

def __str__(self) -> str:
return f"WhatsApp(phone_id={self.phone_id!r})"

Expand Down
Loading

0 comments on commit 06d3bb7

Please sign in to comment.