-
Notifications
You must be signed in to change notification settings - Fork 0
/
connect.py
142 lines (113 loc) · 4.94 KB
/
connect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import authentik_client
from outline import AsyncOutline
from fastapi import FastAPI, Request
import httpx
from dotenv import load_dotenv
import json
import os
import logging
import hmac
import hashlib
import helpers.authentik
import helpers.outline
load_dotenv()
app = FastAPI()
# Logging setup
logger = logging.getLogger("oa-connector")
if os.getenv('DEBUG') == "True":
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(levelname)s: %(name)s: %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
authentik_config = authentik_client.Configuration(
host = f"{os.getenv('AUTHENTIK_URL')}/api/v3",
access_token=os.getenv('AUTHENTIK_TOKEN')
)
outline_client = AsyncOutline(
bearer_token=os.getenv('OUTLINE_TOKEN'),
base_url=os.getenv('OUTLINE_URL')
)
@app.get("/")
def root():
return({'status': 'running'})
@app.post("/sync")
async def sync(request: Request):
logger.debug("Received webhook")
# Verifying webhook signature using secret
body = await request.body()
outline_signature_header = request.headers.get('outline-signature')
if not outline_signature_header:
logger.debug("Request is missing signature")
return({'status': 'missing-signature'})
parts = outline_signature_header.split(',')
if len(parts) != 2:
logger.debug("Request signature is invalid")
return({'status': 'invalid-signature'})
timestamp = parts[0].split('=')[1]
signature = parts[1].split('=')[1]
full_payload = f"{timestamp}.{body.decode('utf-8')}"
digester = hmac.new(os.getenv('OUTLINE_WEBHOOK_SECRET').encode('utf-8'), full_payload.encode('utf-8'), hashlib.sha256)
calculated_signature = digester.hexdigest()
if not hmac.compare_digest(signature, calculated_signature):
logger.debug("Signature calculation failed")
return({'status': 'unauthorized'})
logger.debug("Signature verified, continuing...")
# Processing Outline webhook payload
response = await request.json()
payload = response['payload']
model = payload['model']
outline_id = model['id']
if response['event'] != 'users.signin':
return({'status:': 'wrong-event'})
# Using helper methods to grab groups
authentik_groups = helpers.authentik.get_authentik_groups()
outline_groups = await helpers.outline.get_outline_groups()
matched_groups = []
user_authentik_groups = []
user_outline_groups = []
# Matching groups together by name
for name in authentik_groups:
for outline_group in outline_groups:
if name in outline_group:
matched_groups.append(outline_group)
break
logger.info(f"Matched {len(matched_groups)} groups")
# Getting Outline user's email
user_response = await outline_client.post(path='/api/users.info', cast_to=httpx.Response, body={'id': outline_id})
user = json.loads(await user_response.aread())
email = user['data']['email']
# Getting Authentik user's groups
with authentik_client.ApiClient(authentik_config) as api_client:
api_instance = authentik_client.CoreApi(api_client)
authentik_user = api_instance.core_users_list(email=email).results[0]
for obj in authentik_user.groups_obj:
# Only adding user group if it is in matched_groups
if obj.name in [list(kv.keys())[0] for kv in matched_groups]:
user_authentik_groups.append(obj.name)
logger.debug(f"Got {len(user_authentik_groups)} user groups from Authentik")
# Getting Outline user's groups
for group in outline_groups:
group_id = list(group.values())[0]
membership = await helpers.outline.get_group_membership(group_id, outline_id, user['data']['name'])
if membership:
user_outline_groups.append(group_id)
logger.debug(f"Got {len(user_outline_groups)} user groups from Outline")
# Adding user to matched groups, removing user from unmatched groups
for group in matched_groups:
group_name = list(group.keys())[0]
group_id = list(group.values())[0]
if group_name in user_authentik_groups:
if group_id not in user_outline_groups: # User is in Authentik group but not in Outline group, add
await helpers.outline.add_user_to_group(group_id, outline_id)
else: # User is in both Authentik and Outline groups already
logger.debug(f"User is already in group {group_id}, not adding")
else:
if group_id in user_outline_groups: # User is in Outline group but not in Authentik group, remove
await helpers.outline.remove_user_from_group(group_id, outline_id)
else: # User is not in either Authentik or Outline group
logger.debug(f"User is already not in group {group_id}, not removing")
logger.info("Sync complete!")
return({'status': 'success'})