Skip to content

Commit 4be0c63

Browse files
authored
Offload client registration to queue (#255)
* Add retries to Sendgrid mailbox setup * Make MX record setup idempotent * Remove auth caching * Offload client registration to queue
1 parent 0c7e79a commit 4be0c63

File tree

20 files changed

+372
-109
lines changed

20 files changed

+372
-109
lines changed

docker/app/run-celery.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#!/usr/bin/env bash
22

33
if [[ "${CELERY_QUEUE_NAMES}" = "all" ]]; then
4-
CELERY_QUEUE_NAMES="inbound,written,send,mailboxreceived,mailboxsent"
4+
CELERY_QUEUE_NAMES="register,inbound,written,send,mailboxreceived,mailboxsent"
55
fi
66

77
"${PY_ENV}/bin/celery" \

docker/integtest/1-register-client.sh

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,12 @@ curl -fs \
1313
-H "Content-Type: application/json" \
1414
-u "${REGISTRATION_CREDENTIALS}" \
1515
-d '{"domain":"developer1.lokole.ca"}' \
16-
"http://nginx:8888/api/email/register/" \
17-
| tee "${out_dir}/register1.json"
16+
"http://nginx:8888/api/email/register/"
17+
18+
while ! curl -fs -u "${REGISTRATION_CREDENTIALS}" "http://nginx:8888/api/email/register/developer1.lokole.ca" | tee "${out_dir}/register1.json"; do
19+
log "Waiting for client 1 registration"
20+
sleep 1s
21+
done
1822

1923
# registering a client with bad credentials should fail
2024
if curl -fs \
@@ -29,8 +33,12 @@ curl -fs \
2933
-H "Content-Type: application/json" \
3034
-u "${REGISTRATION_CREDENTIALS}" \
3135
-d '{"domain":"developer2.lokole.ca"}' \
32-
"http://nginx:8888/api/email/register/" \
33-
| tee "${out_dir}/register2.json"
36+
"http://nginx:8888/api/email/register/"
37+
38+
while ! curl -fs -u "${REGISTRATION_CREDENTIALS}" "http://nginx:8888/api/email/register/developer2.lokole.ca" | tee "${out_dir}/register2.json"; do
39+
log "Waiting for client 2 registration"
40+
sleep 1s
41+
done
3442

3543
# after creating a client, creating the same one again should fail but we should be able to delete it
3644
curl -fs \
@@ -39,6 +47,11 @@ curl -fs \
3947
-d '{"domain":"developer3.lokole.ca"}' \
4048
"http://nginx:8888/api/email/register/"
4149

50+
while ! curl -fs -u "${REGISTRATION_CREDENTIALS}" "http://nginx:8888/api/email/register/developer3.lokole.ca"; do
51+
log "Waiting for client 3 registration"
52+
sleep 1s
53+
done
54+
4255
if curl -fs \
4356
-H "Content-Type: application/json" \
4457
-u "${REGISTRATION_CREDENTIALS}" \

opwen_email_server/actions.py

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -313,22 +313,55 @@ def __init__(self,
313313
self._setup_mx_records = setup_mx_records
314314
self._client_id_source = client_id_source or new_client_id
315315

316+
def _action(self, domain, owner): # type: ignore
317+
client_id = self._client_id_source()
318+
319+
self._setup_mailbox(client_id, domain)
320+
self._setup_mx_records(domain)
321+
self._client_storage.ensure_exists()
322+
self._auth.insert(client_id, domain, owner)
323+
324+
self.log_event(events.NEW_CLIENT_REGISTERED, {'domain': domain}) # noqa: E501 # yapf: disable
325+
return 'OK', 200
326+
327+
328+
class CreateClient(_Action):
329+
def __init__(self, auth: AzureAuth, task: Callable[[str, str], None]):
330+
self._auth = auth
331+
self._task = task
332+
316333
def _action(self, client, **auth_args): # type: ignore
317334
domain = client['domain']
318335
if not is_lowercase(domain):
319336
return 'domain must be lowercase', 400
320337
if self._auth.client_id_for(domain) is not None:
321338
return 'client already exists', 409
322339

323-
client_id = self._client_id_source()
324-
access_info = self._client_storage.access_info()
340+
self._task(domain, auth_args.get('user'))
325341

326-
self._setup_mailbox(client_id, domain)
327-
self._setup_mx_records(domain)
328-
self._client_storage.ensure_exists()
329-
self._auth.insert(client_id, domain, auth_args.get('user'))
342+
self.log_event(events.CLIENT_CREATED, {'domain': domain}) # noqa: E501 # yapf: disable
343+
return 'accepted', 201
330344

331-
self.log_event(events.NEW_CLIENT_REGISTERED, {'domain': domain}) # noqa: E501 # yapf: disable
345+
346+
class GetClient(_Action):
347+
def __init__(self, auth: AzureAuth, client_storage: AzureObjectsStorage):
348+
self._auth = auth
349+
self._client_storage = client_storage
350+
351+
def _action(self, domain, **auth_args): # type: ignore
352+
if not is_lowercase(domain):
353+
return 'domain must be lowercase', 400
354+
355+
client_id = self._auth.client_id_for(domain)
356+
if client_id is None:
357+
return 'client does not exist', 404
358+
359+
if not self._auth.is_owner(domain, auth_args.get('user')):
360+
return 'client does not belong to the user', 403
361+
362+
access_info = self._client_storage.access_info()
363+
364+
self.log_event(events.CLIENT_FETCHED, {'domain': domain}) # noqa: E501 # yapf: disable
332365
return {
333366
'client_id': client_id,
334367
'storage_account': access_info.account,

opwen_email_server/constants/cache.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
11
from typing_extensions import Final # noqa: F401
22

3-
AUTH_DOMAIN_CACHE_SIZE = 128 # type: Final
43
PENDING_STORAGE_CACHE_SIZE = 128 # type: Final

opwen_email_server/constants/events.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from typing_extensions import Final # noqa: F401
22

33
CLIENT_DELETED = 'client_deleted' # type: Final
4+
CLIENT_FETCHED = 'client_fetched' # type: Final
5+
CLIENT_CREATED = 'client_created' # type: Final
46
NEW_CLIENT_REGISTERED = 'new_client_registered' # type: Final
57
UNREGISTERED_CLIENT = 'unregistered_client' # type: Final
68
UNKNOWN_USER = 'unknown_user' # type: Final

opwen_email_server/constants/queues.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from typing_extensions import Final # noqa: F401
22

3+
REGISTER_CLIENT_QUEUE = 'register' # type: Final
34
INBOUND_STORE_QUEUE = 'inbound' # type: Final
45
WRITTEN_STORE_QUEUE = 'written' # type: Final
56
SEND_QUEUE = 'send' # type: Final

opwen_email_server/constants/sendgrid.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing_extensions import Final # noqa: F401
22

33
MAILBOX_CREATE_URL = 'https://api.sendgrid.com/v3/user/webhooks/parse/settings' # type: Final # noqa: E501 # yapf: disable
4-
MAILBOX_DELETE_URL = 'https://api.sendgrid.com/v3/user/webhooks/parse/settings/{}' # type: Final # noqa: E501 # yapf: disable
4+
MAILBOX_DETAIL_URL = 'https://api.sendgrid.com/v3/user/webhooks/parse/settings/{}' # type: Final # noqa: E501 # yapf: disable
55

66
INBOX_URL = 'https://mailserver.lokole.ca/api/email/sendgrid/{}' # type: Final
77

opwen_email_server/integration/celery.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,42 @@
22

33
from opwen_email_server.actions import IndexReceivedEmailForMailbox
44
from opwen_email_server.actions import IndexSentEmailForMailbox
5+
from opwen_email_server.actions import RegisterClient
56
from opwen_email_server.actions import SendOutboundEmails
67
from opwen_email_server.actions import StoreInboundEmails
78
from opwen_email_server.actions import StoreWrittenClientEmails
89
from opwen_email_server.config import QUEUE_BROKER
910
from opwen_email_server.constants.queues import INBOUND_STORE_QUEUE
1011
from opwen_email_server.constants.queues import MAILBOX_RECEIVED_QUEUE
1112
from opwen_email_server.constants.queues import MAILBOX_SENT_QUEUE
13+
from opwen_email_server.constants.queues import REGISTER_CLIENT_QUEUE
1214
from opwen_email_server.constants.queues import SEND_QUEUE
1315
from opwen_email_server.constants.queues import WRITTEN_STORE_QUEUE
16+
from opwen_email_server.integration.azure import get_auth
1417
from opwen_email_server.integration.azure import get_client_storage
1518
from opwen_email_server.integration.azure import get_email_sender
1619
from opwen_email_server.integration.azure import get_email_storage
20+
from opwen_email_server.integration.azure import get_mailbox_setup
1721
from opwen_email_server.integration.azure import get_mailbox_storage
22+
from opwen_email_server.integration.azure import get_mx_setup
1823
from opwen_email_server.integration.azure import get_pending_storage
1924
from opwen_email_server.integration.azure import get_raw_email_storage
2025

2126
celery = Celery(broker=QUEUE_BROKER)
2227

2328

29+
@celery.task(ignore_result=True)
30+
def register_client(domain: str, owner: str) -> None:
31+
action = RegisterClient(
32+
auth=get_auth(),
33+
client_storage=get_client_storage(),
34+
setup_mailbox=get_mailbox_setup(),
35+
setup_mx_records=get_mx_setup(),
36+
)
37+
38+
action(domain, owner)
39+
40+
2441
@celery.task(ignore_result=True)
2542
def index_received_email_for_mailbox(resource_id: str) -> None:
2643
action = IndexReceivedEmailForMailbox(
@@ -85,6 +102,7 @@ def _fqn(task):
85102

86103
celery.conf.update(
87104
task_routes={
105+
_fqn(register_client): {'queue': REGISTER_CLIENT_QUEUE},
88106
_fqn(index_received_email_for_mailbox): {'queue': MAILBOX_RECEIVED_QUEUE},
89107
_fqn(index_sent_email_for_mailbox): {'queue': MAILBOX_SENT_QUEUE},
90108
_fqn(inbound_store): {'queue': INBOUND_STORE_QUEUE},

opwen_email_server/integration/connexion.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
from opwen_email_server import config
22
from opwen_email_server.actions import CalculatePendingEmailsMetric
3+
from opwen_email_server.actions import CreateClient
34
from opwen_email_server.actions import DeleteClient
45
from opwen_email_server.actions import DownloadClientEmails
6+
from opwen_email_server.actions import GetClient
57
from opwen_email_server.actions import Ping
68
from opwen_email_server.actions import ReceiveInboundEmail
7-
from opwen_email_server.actions import RegisterClient
89
from opwen_email_server.actions import UploadClientEmails
910
from opwen_email_server.integration.azure import get_auth
1011
from opwen_email_server.integration.azure import get_client_storage
1112
from opwen_email_server.integration.azure import get_email_storage
1213
from opwen_email_server.integration.azure import get_mailbox_deletion
13-
from opwen_email_server.integration.azure import get_mailbox_setup
1414
from opwen_email_server.integration.azure import get_mx_deletion
15-
from opwen_email_server.integration.azure import get_mx_setup
1615
from opwen_email_server.integration.azure import get_pending_storage
1716
from opwen_email_server.integration.azure import get_raw_email_storage
1817
from opwen_email_server.integration.celery import inbound_store
18+
from opwen_email_server.integration.celery import register_client
1919
from opwen_email_server.integration.celery import written_store
2020
from opwen_email_server.services.auth import AnyOfBasicAuth
2121
from opwen_email_server.services.auth import BasicAuth
@@ -39,11 +39,14 @@
3939
pending_factory=get_pending_storage,
4040
)
4141

42-
client_register = RegisterClient(
42+
client_create = CreateClient(
43+
auth=get_auth(),
44+
task=register_client.delay,
45+
)
46+
47+
client_get = GetClient(
4348
auth=get_auth(),
4449
client_storage=get_client_storage(),
45-
setup_mailbox=get_mailbox_setup(),
46-
setup_mx_records=get_mx_setup(),
4750
)
4851

4952
client_delete = DeleteClient(

opwen_email_server/services/auth.py

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from functools import lru_cache
21
from json import JSONDecodeError
32
from typing import Callable
43
from typing import Dict
@@ -12,7 +11,6 @@
1211

1312
from opwen_email_server.constants import events
1413
from opwen_email_server.constants import github
15-
from opwen_email_server.constants.cache import AUTH_DOMAIN_CACHE_SIZE
1614
from opwen_email_server.services.storage import AzureTextStorage
1715
from opwen_email_server.utils.log import LogMixin
1816
from opwen_email_server.utils.serialization import from_json
@@ -164,7 +162,6 @@ def is_owner(self, domain: str, username: str) -> bool:
164162
def delete(self, client_id: str, domain: str) -> bool:
165163
self._storage.delete(domain)
166164
self._storage.delete(client_id)
167-
self._domain_for_cached.cache_clear()
168165
return True
169166

170167
def client_id_for(self, domain: str) -> Optional[str]:
@@ -184,14 +181,10 @@ def client_id_for(self, domain: str) -> Optional[str]:
184181

185182
def domain_for(self, client_id: str) -> Optional[str]:
186183
try:
187-
domain = self._domain_for_cached(client_id)
184+
domain = self._storage.fetch_text(client_id)
188185
except ObjectDoesNotExistError:
189186
self.log_debug('Unrecognized client %s', client_id)
190187
return None
191188
else:
192189
self.log_debug('Client %s has domain %s', client_id, domain)
193190
return domain
194-
195-
@lru_cache(maxsize=AUTH_DOMAIN_CACHE_SIZE)
196-
def _domain_for_cached(self, client_id: str) -> str:
197-
return self._storage.fetch_text(client_id)

opwen_email_server/services/dns.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from cached_property import cached_property
2+
from libcloud.common.types import LibcloudError
23
from libcloud.dns.base import DNSDriver
34
from libcloud.dns.base import Zone
45
from libcloud.dns.providers import get_driver
@@ -48,11 +49,14 @@ def _run(self, client_name: str, zone: Zone) -> None:
4849

4950
class SetupMxRecords(_MxRecords):
5051
def _run(self, client_name: str, zone: Zone) -> None:
51-
self._driver.create_record(
52-
zone=zone,
53-
name=client_name,
54-
type=RecordType.MX,
55-
data=MX_RECORD,
56-
)
57-
58-
self.log_debug('Set up MX records for client %s.%s', client_name, zone.domain)
52+
try:
53+
self._driver.create_record(
54+
zone=zone,
55+
name=client_name,
56+
type=RecordType.MX,
57+
data=MX_RECORD,
58+
)
59+
except LibcloudError:
60+
self.log_debug('MX records for client %s.%s already exist', client_name, zone.domain)
61+
else:
62+
self.log_debug('Set up MX records for client %s.%s', client_name, zone.domain)

opwen_email_server/services/sendgrid.py

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
from itertools import count
12
from mimetypes import guess_type
3+
from time import sleep
24
from typing import Callable
35

46
from cached_property import cached_property
57
from python_http_client import BadRequestsError
68
from requests import delete as http_delete
9+
from requests import get as http_get
710
from requests import post as http_post
811
from sendgrid import SendGridAPIClient
912
from sendgrid.helpers.mail import Attachment
@@ -16,7 +19,7 @@
1619

1720
from opwen_email_server.constants.sendgrid import INBOX_URL
1821
from opwen_email_server.constants.sendgrid import MAILBOX_CREATE_URL
19-
from opwen_email_server.constants.sendgrid import MAILBOX_DELETE_URL
22+
from opwen_email_server.constants.sendgrid import MAILBOX_DETAIL_URL
2023
from opwen_email_server.utils.log import LogMixin
2124
from opwen_email_server.utils.serialization import to_base64
2225

@@ -139,7 +142,7 @@ def _run(self, client_id: str, domain: str) -> None:
139142
class DeleteSendgridMailbox(_SendgridManagement):
140143
def _run(self, client_id: str, domain: str) -> None:
141144
http_delete(
142-
url=MAILBOX_DELETE_URL.format(domain),
145+
url=MAILBOX_DETAIL_URL.format(domain),
143146
headers={
144147
'Authorization': f'Bearer {self._key}',
145148
},
@@ -149,18 +152,43 @@ def _run(self, client_id: str, domain: str) -> None:
149152

150153

151154
class SetupSendgridMailbox(_SendgridManagement):
152-
def _run(self, client_id: str, domain: str) -> None:
153-
http_post(
154-
url=MAILBOX_CREATE_URL,
155-
json={
156-
'hostname': domain,
157-
'url': INBOX_URL.format(client_id),
158-
'spam_check': True,
159-
'send_raw': True,
160-
},
161-
headers={
162-
'Authorization': f'Bearer {self._key}',
163-
},
164-
).raise_for_status()
155+
def __init__(self, key: str, max_retries: int = 10, retry_interval_seconds: float = 1):
156+
super().__init__(key)
157+
self._max_retries = max_retries
158+
self._retry_interval_seconds = retry_interval_seconds
165159

166-
self.log_debug('Set up mailbox for %s', domain)
160+
def _run(self, client_id: str, domain: str) -> None:
161+
for retry in count():
162+
get_response = http_get(
163+
url=MAILBOX_DETAIL_URL.format(domain),
164+
headers={
165+
'Authorization': f'Bearer {self._key}',
166+
},
167+
)
168+
169+
if get_response.ok:
170+
self.log_debug('Mailbox %s already exists', domain)
171+
break
172+
173+
create_response = http_post(
174+
url=MAILBOX_CREATE_URL,
175+
json={
176+
'hostname': domain,
177+
'url': INBOX_URL.format(client_id),
178+
'spam_check': True,
179+
'send_raw': True,
180+
},
181+
headers={
182+
'Authorization': f'Bearer {self._key}',
183+
},
184+
)
185+
186+
if create_response.ok:
187+
self.log_debug('Set up mailbox for %s', domain)
188+
break
189+
190+
if retry > self._max_retries:
191+
self.log_debug('Too many attempts to set up mailbox for %s', domain)
192+
create_response.raise_for_status()
193+
194+
sleep(self._retry_interval_seconds)

0 commit comments

Comments
 (0)