-
Notifications
You must be signed in to change notification settings - Fork 2
/
kytos_api_helper.py
219 lines (195 loc) · 7.64 KB
/
kytos_api_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
""" This module was created to be the main interface between the telemetry napp and all
other kytos napps' APIs """
from collections import defaultdict
from typing import Union
import httpx
from napps.kytos.telemetry_int import settings
from napps.kytos.telemetry_int.exceptions import UnrecoverableError
from tenacity import (
retry,
retry_if_exception_type,
stop_after_attempt,
wait_combine,
wait_fixed,
wait_random,
)
from kytos.core.retry import before_sleep
@retry(
stop=stop_after_attempt(5),
wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
before_sleep=before_sleep,
retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)),
)
async def get_evcs(**kwargs) -> dict:
"""Get EVCs."""
archived = "false"
async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
endpoint = f"/evc/?archived={archived}"
if kwargs:
query_args = [f"{k}={v}" for k, v in kwargs.items()]
endpoint = f"{endpoint}&{'&'.join(query_args)}"
response = await client.get(endpoint, timeout=10)
if response.is_server_error:
raise httpx.RequestError(response.text)
if not response.is_success:
raise UnrecoverableError(
f"Failed to get_evcs archived {archived}"
f"status code {response.status_code}, response text: {response.text}"
)
return response.json()
@retry(
stop=stop_after_attempt(5),
wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
before_sleep=before_sleep,
retry=retry_if_exception_type(httpx.RequestError),
)
async def get_evc(evc_id: str, exclude_archived=True) -> dict:
"""Get EVC."""
async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
response = await client.get(f"/evc/{evc_id}", timeout=10)
if response.status_code == 404:
return {}
if response.is_server_error:
raise httpx.RequestError(response.text)
if not response.is_success:
raise UnrecoverableError(
f"Failed to get_evc id {evc_id} "
f"status code {response.status_code}, response text: {response.text}"
)
data = response.json()
if data["archived"] and exclude_archived:
return {}
return {data["id"]: data}
@retry(
stop=stop_after_attempt(5),
wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
before_sleep=before_sleep,
retry=retry_if_exception_type(httpx.RequestError),
)
async def get_stored_flows(
cookies: list[Union[int, tuple[int, int]]] = None,
) -> dict[int, list[dict]]:
"""Get flow_manager stored_flows grouped by cookies given a list of cookies."""
cookies = cookies or []
cookie_range_args = []
for cookie in cookies:
if isinstance(cookie, int):
# gte cookie
cookie_range_args.append(cookie)
# lte cookie
cookie_range_args.append(cookie)
elif isinstance(cookie, tuple) and len(cookie) == 2:
# gte cookie
cookie_range_args.append(cookie[0])
# lte cookie
cookie_range_args.append(cookie[1])
endpoint = "stored_flows?state=installed&state=pending"
async with httpx.AsyncClient(base_url=settings.flow_manager_api) as client:
if cookie_range_args:
response = await client.request(
"GET",
f"/{endpoint}",
json={"cookie_range": cookie_range_args},
timeout=10,
)
else:
response = await client.get(f"/{endpoint}", timeout=10)
if response.is_server_error:
raise httpx.RequestError(response.text)
if not response.is_success:
raise UnrecoverableError(
f"Failed to get_stored_flows cookies {cookies} "
f"status code {response.status_code}, response text: {response.text}"
)
return _map_stored_flows_by_cookies(response.json())
def _map_stored_flows_by_cookies(stored_flows: dict) -> dict[int, list[dict]]:
"""Map stored flows by cookies.
This is for mapping the data by cookies, just to it can be
reused upfront by bulk operations.
"""
flows_by_cookies = defaultdict(list)
for flows in stored_flows.values():
for flow in flows:
flows_by_cookies[flow["flow"]["cookie"]].append(flow)
return flows_by_cookies
@retry(
stop=stop_after_attempt(5),
wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
before_sleep=before_sleep,
retry=retry_if_exception_type(httpx.RequestError),
)
async def add_evcs_metadata(
evcs: dict[str, dict], new_metadata: dict, force=False
) -> dict:
"""Add EVC metadata."""
circuit_ids = [evc_id for evc_id, evc in evcs.items() if evc]
# return early if there's no circuits to update their metadata
if not circuit_ids:
return {}
async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
response = await client.post(
"/evc/metadata",
timeout=10,
json={
**new_metadata,
**{"circuit_ids": circuit_ids},
},
)
if response.is_success:
return response.json()
# Ignore 404 if force just so it's easier to handle this concurrently
if response.status_code == 404 and force:
return {}
if response.is_server_error:
raise httpx.RequestError(response.text)
raise UnrecoverableError(
f"Failed to add_evc_metadata for EVC ids {list(evcs.keys())} "
f"status code {response.status_code}, response text: {response.text}"
)
@retry(
stop=stop_after_attempt(5),
wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
before_sleep=before_sleep,
retry=retry_if_exception_type(httpx.RequestError),
)
async def add_proxy_port_metadata(intf_id: str, port_no: int) -> dict:
"""Add proxy_port metadata."""
async with httpx.AsyncClient(base_url=settings.topology_url) as client:
response = await client.post(
f"/interfaces/{intf_id}/metadata",
timeout=10,
json={"proxy_port": port_no},
)
if response.is_success:
return response.json()
if response.status_code == 404:
raise ValueError(f"interface_id {intf_id} not found")
if response.is_server_error:
raise httpx.RequestError(response.text)
raise UnrecoverableError(
f"Failed to add_proxy_port {port_no} metadata for intf_id {intf_id} "
f"status code {response.status_code}, response text: {response.text}"
)
@retry(
stop=stop_after_attempt(5),
wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
before_sleep=before_sleep,
retry=retry_if_exception_type(httpx.RequestError),
)
async def delete_proxy_port_metadata(intf_id: str) -> dict:
"""Delete proxy_port metadata."""
async with httpx.AsyncClient(base_url=settings.topology_url) as client:
response = await client.delete(
f"/interfaces/{intf_id}/metadata/proxy_port",
timeout=10,
)
if response.is_success:
return response.json()
if response.status_code == 404:
raise ValueError(f"interface_id {intf_id} or metadata proxy_port not found")
if response.is_server_error:
raise httpx.RequestError(response.text)
raise UnrecoverableError(
f"Failed to delete_proxy_port metadata for intf_id {intf_id} "
f"status code {response.status_code}, response text: {response.text}"
)