Skip to content

Commit

Permalink
Merge pull request #438 from kytos-ng/issue_405
Browse files Browse the repository at this point in the history
Enhance link down handler to avoid race conditions in the failover path
  • Loading branch information
italovalcy authored Mar 8, 2024
2 parents 62c6258 + 9fb511b commit 60ebb1d
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 79 deletions.
33 changes: 31 additions & 2 deletions controllers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,37 @@ def update_evc(self, evc: Dict) -> Optional[Dict]:
)
return updated

def update_evcs(self, circuit_ids: list, metadata: dict, action: str):
"""Update a bulk of EVC"""
def update_evcs(self, evcs: list[dict]) -> int:
"""Update EVCs and return the number of modified documents."""
if not evcs:
return 0

ops = []
utc_now = datetime.utcnow()

for evc in evcs:
evc["updated_at"] = utc_now
model = EVCBaseDoc(
**{
**evc,
**{"_id": evc["id"]}
}
).dict(exclude_none=True)
ops.append(
UpdateOne(
{"_id": evc["id"]},
{
"$set": model,
"$setOnInsert": {"inserted_at": utc_now}
},
)
)
return self.db.evcs.bulk_write(ops).modified_count

def update_evcs_metadata(
self, circuit_ids: list, metadata: dict, action: str
):
"""Bulk update EVCs metadata."""
utc_now = datetime.utcnow()
metadata = {f"metadata.{k}": v for k, v in metadata.items()}
if action == "add":
Expand Down
133 changes: 80 additions & 53 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def execute_consistency(self):
stored_circuits.pop(circuit.id, None)
if self.should_be_checked(circuit):
circuits_to_check.append(circuit)
circuit.try_setup_failover_path()
circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
for circuit in circuits_to_check:
is_checked = circuits_checked.get(circuit.id)
Expand Down Expand Up @@ -470,7 +471,7 @@ def bulk_add_metadata(self, request: Request) -> JSONResponse:
data = get_json_or_400(request, self.controller.loop)
circuit_ids = data.pop("circuit_ids")

self.mongo_controller.update_evcs(circuit_ids, data, "add")
self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")

fail_evcs = []
for _id in circuit_ids:
Expand Down Expand Up @@ -511,7 +512,9 @@ def bulk_delete_metadata(self, request: Request) -> JSONResponse:
data = get_json_or_400(request, self.controller.loop)
key = request.path_params["key"]
circuit_ids = data.pop("circuit_ids")
self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
self.mongo_controller.update_evcs_metadata(
circuit_ids, {key: ""}, "del"
)

fail_evcs = []
for _id in circuit_ids:
Expand Down Expand Up @@ -800,14 +803,14 @@ def handle_interface_link_down(self, interface):
"""
Handler for interface link_down events
"""
log.info("Event handle_interface_link_down %s", interface)
for evc in self.get_evcs_by_svc_level():
with evc.lock:
log.info("Event handle_interface_link_down %s", interface)
evc.handle_interface_link_down(
interface
)

@listen_to("kytos/topology.link_down")
@listen_to("kytos/topology.link_down", pool="dynamic_single")
def on_link_down(self, event):
"""Change circuit when link is down or under_mantenance."""
self.handle_link_down(event)
Expand All @@ -821,31 +824,39 @@ def handle_link_down(self, event): # pylint: disable=too-many-branches
evcs_normal = []
check_failover = []
for evc in self.get_evcs_by_svc_level():
if evc.is_affected_by_link(link):
# if there is no failover path, handles link down the
# tradditional way
if (
not getattr(evc, 'failover_path', None) or
evc.is_failover_path_affected_by_link(link)
):
evcs_normal.append(evc)
continue
try:
dpid_flows = evc.get_failover_flows()
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(
f"Ignore Failover path for {evc} due to error: {err}"
)
evcs_normal.append(evc)
continue
for dpid, flows in dpid_flows.items():
switch_flows.setdefault(dpid, [])
switch_flows[dpid].extend(flows)
evcs_with_failover.append(evc)
else:
check_failover.append(evc)
with evc.lock:
if evc.is_affected_by_link(link):
evc.affected_by_link_at = event.timestamp
# if there is no failover path, handles link down the
# tradditional way
if (
not evc.failover_path or
evc.is_failover_path_affected_by_link(link)
):
evcs_normal.append(evc)
continue
try:
dpid_flows = evc.get_failover_flows()
evc.old_path = evc.current_path
evc.current_path = evc.failover_path
evc.failover_path = Path([])
# pylint: disable=broad-except
except Exception:
err = traceback.format_exc().replace("\n", ", ")
log.error(
"Ignore Failover path for "
f"{evc} due to error: {err}"
)
evcs_normal.append(evc)
continue
for dpid, flows in dpid_flows.items():
switch_flows.setdefault(dpid, [])
switch_flows[dpid].extend(flows)
evcs_with_failover.append(evc)
elif evc.is_failover_path_affected_by_link(link):
evc.old_path = evc.failover_path
evc.failover_path = Path([])
check_failover.append(evc)

while switch_flows:
offset = settings.BATCH_SIZE or None
Expand All @@ -866,33 +877,34 @@ def handle_link_down(self, event): # pylint: disable=too-many-branches
switch_flows[dpid] = switch_flows[dpid][offset:]
time.sleep(settings.BATCH_INTERVAL)

for evc in evcs_with_failover:
with evc.lock:
old_path = evc.current_path
evc.current_path = evc.failover_path
evc.failover_path = old_path
evc.sync()
emit_event(self.controller, "redeployed_link_down",
content=map_evc_event_content(evc))
log.info(
f"{evc} redeployed with failover due to link down {link.id}"
)

for evc in evcs_normal:
emit_event(
self.controller,
"evc_affected_by_link_down",
content={"link_id": link.id} | map_evc_event_content(evc)
content={"link": link} | map_evc_event_content(evc)
)

# After handling the hot path, check if new failover paths are needed.
# Note that EVCs affected by link down will generate a KytosEvent for
# deployed|redeployed, which will trigger the failover path setup.
# Thus, we just need to further check the check_failover list
evcs_to_update = []
for evc in evcs_with_failover:
evcs_to_update.append(evc.as_dict())
emit_event(
self.controller,
"redeployed_link_down",
content=map_evc_event_content(evc)
)
log.info(
f"{evc} redeployed with failover due to link down {link.id}"
)
for evc in check_failover:
if evc.is_failover_path_affected_by_link(link):
with evc.lock:
evc.setup_failover_path()
evcs_to_update.append(evc.as_dict())

self.mongo_controller.update_evcs(evcs_to_update)

emit_event(
self.controller,
"cleanup_evcs_old_path",
content={"evcs": evcs_with_failover + check_failover}
)

@listen_to("kytos/mef_eline.evc_affected_by_link_down")
def on_evc_affected_by_link_down(self, event):
Expand All @@ -902,14 +914,16 @@ def on_evc_affected_by_link_down(self, event):
def handle_evc_affected_by_link_down(self, event):
"""Change circuit when link is down or under_mantenance."""
evc = self.circuits.get(event.content["evc_id"])
link_id = event.content['link_id']
link = event.content['link']
if not evc:
return
with evc.lock:
if not evc.is_affected_by_link(link):
return
result = evc.handle_link_down()
event_name = "error_redeploy_link_down"
if result:
log.info(f"{evc} redeployed due to link down {link_id}")
log.info(f"{evc} redeployed due to link down {link.id}")
event_name = "redeployed_link_down"
emit_event(self.controller, event_name,
content=map_evc_event_content(evc))
Expand All @@ -924,8 +938,21 @@ def handle_evc_deployed(self, event):
evc = self.circuits.get(event.content["evc_id"])
if not evc:
return
with evc.lock:
evc.setup_failover_path()
evc.try_setup_failover_path()

@listen_to("kytos/mef_eline.cleanup_evcs_old_path")
def on_cleanup_evcs_old_path(self, event):
"""Handle cleanup evcs old path."""
self.handle_cleanup_evcs_old_path(event)

def handle_cleanup_evcs_old_path(self, event):
"""Handle cleanup evcs old path."""
evcs = event.content.get("evcs", [])
for evc in evcs:
if not evc.old_path:
continue
evc.remove_path_flows(evc.old_path)
evc.old_path = Path([])

@listen_to("kytos/topology.topology_loaded")
def on_topology_loaded(self, event): # pylint: disable=unused-argument
Expand Down
10 changes: 10 additions & 0 deletions models/evc.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ def __init__(self, controller, **kwargs):
self.current_links_cache = set()
self.primary_links_cache = set()
self.backup_links_cache = set()
self.affected_by_link_at = get_time("0001-01-01T00:00:00")
self.old_path = Path([])

self.lock = Lock()

Expand Down Expand Up @@ -872,6 +874,14 @@ def deploy_to_path(self, path=None): # pylint: disable=too-many-branches
log.info(f"{self} was deployed.")
return True

def try_setup_failover_path(self, wait=settings.DEPLOY_EVCS_INTERVAL):
"""Try setup failover_path whenever possible."""
if self.failover_path:
return
if (now() - self.affected_by_link_at).seconds >= wait:
with self.lock:
self.setup_failover_path()

def setup_failover_path(self):
"""Install flows for the failover path of this EVC.
Expand Down
14 changes: 11 additions & 3 deletions tests/unit/test_controllers.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,19 @@ def test_upsert_evc(self):
self.eline.upsert_evc(self.evc_dict)
assert self.eline.db.evcs.find_one_and_update.call_count == 1

def test_update_evcs(self):
"""Test update_evcs"""
def test_update_evcs_metadata(self):
"""Test update_evcs_metadata"""
circuit_ids = ["123", "456", "789"]
metadata = {"info": "testing"}
self.eline.update_evcs(circuit_ids, metadata, "add")
self.eline.update_evcs_metadata(circuit_ids, metadata, "add")
arg = self.eline.db.evcs.bulk_write.call_args[0][0]
assert len(arg) == 3
assert self.eline.db.evcs.bulk_write.call_count == 1

def test_update_evcs(self):
"""Test update_evcs"""
evc2 = dict(self.evc_dict | {"id": "456"})
self.eline.update_evcs([self.evc_dict, evc2])
arg = self.eline.db.evcs.bulk_write.call_args[0][0]
assert len(arg) == 2
assert self.eline.db.evcs.bulk_write.call_count == 1
Loading

0 comments on commit 60ebb1d

Please sign in to comment.